This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a2760cf39c213095604c680fb171e1d1ce4668d Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 14 19:47:40 2020 +0200 [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method --- .../source/reader/fetcher/SplitFetcherTest.java | 27 +++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index eef8328..4fa99dd 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -41,13 +41,14 @@ import static org.junit.Assert.assertTrue; * Unit test for {@link SplitFetcher}. */ public class SplitFetcherTest { - private static final int NUM_SPLITS = 3; - private static final int NUM_RECORDS_PER_SPLIT = 10_000; - private static final int INTERRUPT_RECORDS_INTERVAL = 10; - private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS; @Test public void testWakeup() throws InterruptedException { + final int numSplits = 3; + final int numRecordsPerSplit = 10_000; + final int interruptRecordsInterval = 10; + final int numTotalRecords = numRecordsPerSplit * numSplits; + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1); SplitFetcher<int[], MockSourceSplit> fetcher = @@ -59,10 +60,10 @@ public class SplitFetcherTest { // Prepare the splits. List<MockSourceSplit> splits = new ArrayList<>(); - for (int i = 0; i < NUM_SPLITS; i++) { - splits.add(new MockSourceSplit(i, 0, NUM_RECORDS_PER_SPLIT)); - int base = i * NUM_RECORDS_PER_SPLIT; - for (int j = base; j < base + NUM_RECORDS_PER_SPLIT; j++) { + for (int i = 0; i < numSplits; i++) { + splits.add(new MockSourceSplit(i, 0, numRecordsPerSplit)); + int base = i * numRecordsPerSplit; + for (int j = base; j < base + numRecordsPerSplit; j++) { splits.get(splits.size() - 1).addRecord(j); } } @@ -81,9 +82,9 @@ public class SplitFetcherTest { @Override public void run() { int lastInterrupt = 0; - while (recordsRead.size() < NUM_TOTAL_RECORDS && !stop.get()) { + while (recordsRead.size() < numTotalRecords && !stop.get()) { int numRecordsRead = recordsRead.size(); - if (numRecordsRead >= lastInterrupt + INTERRUPT_RECORDS_INTERVAL) { + if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) { fetcher.wakeUp(false); wakeupTimes.incrementAndGet(); lastInterrupt = numRecordsRead; @@ -96,7 +97,7 @@ public class SplitFetcherTest { fetcherThread.start(); interrupter.start(); - while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) { + while (recordsRead.size() < numSplits * numRecordsPerSplit) { final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take(); while (nextBatch.nextSplit() != null) { int[] arr; @@ -106,9 +107,9 @@ public class SplitFetcherTest { } } - assertEquals(NUM_TOTAL_RECORDS, recordsRead.size()); + assertEquals(numTotalRecords, recordsRead.size()); assertEquals(0, (int) recordsRead.first()); - assertEquals(NUM_TOTAL_RECORDS - 1, (int) recordsRead.last()); + assertEquals(numTotalRecords - 1, (int) recordsRead.last()); assertTrue(wakeupTimes.get() > 0); } finally { stop.set(true);
