becketqin commented on a change in pull request #13385:
URL: https://github.com/apache/flink/pull/13385#discussion_r488508741



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
##########
@@ -275,4 +340,22 @@ private void setWakeUp(boolean value) {
                        wakeUp = value;
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       private static CompletableFuture<Void> getAvailableFuture() {

Review comment:
       Would it be simpler to just have a new `CompletableFuture` instance 
here? It seems the cost is negligible and we don't need to depend on the 
`flink-runtime` in that case.

##########
File path: tools/ci/log4j.properties
##########
@@ -73,3 +73,8 @@ logger.consumer.level = OFF
 logger.queryablestate.name = org.apache.flink.queryablestate
 logger.queryablestate.level = TRACE
 logger.queryablestate.appenderRef.out.ref = ConsoleAppender
+
+# Temporarily enable TRACE for SourceReaderBase to debug a deadlock
+logger.sourcereader.name = org.apache.flink.connector.base.source.reader
+logger.sourcereader.level = TRACE

Review comment:
       Are we going to keep this trace logging?

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
##########
@@ -29,25 +35,94 @@
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
  * Unit test for {@link SplitFetcher}.
  */
 public class SplitFetcherTest {
-       private static final int NUM_SPLITS = 3;
-       private static final int NUM_RECORDS_PER_SPLIT = 10_000;
-       private static final int INTERRUPT_RECORDS_INTERVAL = 10;
-       private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * 
NUM_SPLITS;
+
+       @Test
+       public void testNewFetcherIsIdle() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testFetcherNotIdleAfterSplitAdded() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               final TestingSourceSplit split = new 
TestingSourceSplit("test-split");
+
+               fetcher.addSplits(Collections.singletonList(split));
+
+               assertFalse(fetcher.isIdle());
+
+               // need to loop here because the internal wakeup flag handling 
means we need multiple loops
+               while (fetcher.assignedSplits().isEmpty()) {
+                       fetcher.runOnce();
+                       assertFalse(fetcher.isIdle());
+               }
+       }
+
+       @Test
+       public void testIdleAfterFinishedSplitsEnqueued() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split", new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testNotifiesWhenGoingIdle() {
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new 
FutureCompletingBlockingQueue<>();
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split",
+                       queue,
+                       new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+               assertTrue(queue.getAvailabilityFuture().isDone());

Review comment:
       This tests that the queue returns an available future after the fetcher 
goes to idle. Should we get a future before the fetcher goes to idle and assert 
that future is done here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to