becketqin commented on a change in pull request #13385:
URL: https://github.com/apache/flink/pull/13385#discussion_r488508741
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
##########
@@ -275,4 +340,22 @@ private void setWakeUp(boolean value) {
wakeUp = value;
}
}
+
+ //
------------------------------------------------------------------------
+ // utilities
+ //
------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private static CompletableFuture<Void> getAvailableFuture() {
Review comment:
Would it be simpler to just have a new `CompletableFuture` instance
here? It seems the cost is negligible and we don't need to depend on the
`flink-runtime` in that case.
##########
File path: tools/ci/log4j.properties
##########
@@ -73,3 +73,8 @@ logger.consumer.level = OFF
logger.queryablestate.name = org.apache.flink.queryablestate
logger.queryablestate.level = TRACE
logger.queryablestate.appenderRef.out.ref = ConsoleAppender
+
+# Temporarily enable TRACE for SourceReaderBase to debug a deadlock
+logger.sourcereader.name = org.apache.flink.connector.base.source.reader
+logger.sourcereader.level = TRACE
Review comment:
Are we going to keep this trace logging?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
##########
@@ -29,25 +35,94 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit test for {@link SplitFetcher}.
*/
public class SplitFetcherTest {
- private static final int NUM_SPLITS = 3;
- private static final int NUM_RECORDS_PER_SPLIT = 10_000;
- private static final int INTERRUPT_RECORDS_INTERVAL = 10;
- private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT *
NUM_SPLITS;
+
+ @Test
+ public void testNewFetcherIsIdle() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcher(new TestingSplitReader<>());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testFetcherNotIdleAfterSplitAdded() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcher(new TestingSplitReader<>());
+ final TestingSourceSplit split = new
TestingSourceSplit("test-split");
+
+ fetcher.addSplits(Collections.singletonList(split));
+
+ assertFalse(fetcher.isIdle());
+
+ // need to loop here because the internal wakeup flag handling
means we need multiple loops
+ while (fetcher.assignedSplits().isEmpty()) {
+ fetcher.runOnce();
+ assertFalse(fetcher.isIdle());
+ }
+ }
+
+ @Test
+ public void testIdleAfterFinishedSplitsEnqueued() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcherWithSplit(
+ "test-split", new
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testNotifiesWhenGoingIdle() {
+ final
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new
FutureCompletingBlockingQueue<>();
+ final SplitFetcher<Object, TestingSourceSplit> fetcher =
createFetcherWithSplit(
+ "test-split",
+ queue,
+ new
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ assertTrue(queue.getAvailabilityFuture().isDone());
Review comment:
This tests that the queue returns an available future after the fetcher
goes to idle. Should we get a future before the fetcher goes to idle and assert
that future is done here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]