StephanEwen commented on a change in pull request #13385:
URL: https://github.com/apache/flink/pull/13385#discussion_r488606549



##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
##########
@@ -29,25 +35,94 @@
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
  * Unit test for {@link SplitFetcher}.
  */
 public class SplitFetcherTest {
-       private static final int NUM_SPLITS = 3;
-       private static final int NUM_RECORDS_PER_SPLIT = 10_000;
-       private static final int INTERRUPT_RECORDS_INTERVAL = 10;
-       private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * 
NUM_SPLITS;
+
+       @Test
+       public void testNewFetcherIsIdle() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testFetcherNotIdleAfterSplitAdded() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               final TestingSourceSplit split = new 
TestingSourceSplit("test-split");
+
+               fetcher.addSplits(Collections.singletonList(split));
+
+               assertFalse(fetcher.isIdle());
+
+               // need to loop here because the internal wakeup flag handling 
means we need multiple loops
+               while (fetcher.assignedSplits().isEmpty()) {
+                       fetcher.runOnce();
+                       assertFalse(fetcher.isIdle());
+               }
+       }
+
+       @Test
+       public void testIdleAfterFinishedSplitsEnqueued() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split", new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testNotifiesWhenGoingIdle() {
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new 
FutureCompletingBlockingQueue<>();
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split",
+                       queue,
+                       new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+               assertTrue(queue.getAvailabilityFuture().isDone());

Review comment:
       Good idea, let's have both tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to