StephanEwen commented on a change in pull request #13385:
URL: https://github.com/apache/flink/pull/13385#discussion_r488606549
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
##########
@@ -29,25 +35,94 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit test for {@link SplitFetcher}.
*/
public class SplitFetcherTest {
- private static final int NUM_SPLITS = 3;
- private static final int NUM_RECORDS_PER_SPLIT = 10_000;
- private static final int INTERRUPT_RECORDS_INTERVAL = 10;
- private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT *
NUM_SPLITS;
+
+ @Test
+ public void testNewFetcherIsIdle() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcher(new TestingSplitReader<>());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testFetcherNotIdleAfterSplitAdded() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcher(new TestingSplitReader<>());
+ final TestingSourceSplit split = new
TestingSourceSplit("test-split");
+
+ fetcher.addSplits(Collections.singletonList(split));
+
+ assertFalse(fetcher.isIdle());
+
+ // need to loop here because the internal wakeup flag handling
means we need multiple loops
+ while (fetcher.assignedSplits().isEmpty()) {
+ fetcher.runOnce();
+ assertFalse(fetcher.isIdle());
+ }
+ }
+
+ @Test
+ public void testIdleAfterFinishedSplitsEnqueued() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcherWithSplit(
+ "test-split", new
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testNotifiesWhenGoingIdle() {
+ final
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new
FutureCompletingBlockingQueue<>();
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcherWithSplit(
+ "test-split",
+ queue,
+ new
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ assertTrue(queue.getAvailabilityFuture().isDone());
Review comment:
Good idea, let's have both tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]