This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a2760cf39c213095604c680fb171e1d1ce4668d
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 14 19:47:40 2020 +0200

    [hotfix][tests] Move constants in SplitFetcherTest relevant to only one 
test into test method
---
 .../source/reader/fetcher/SplitFetcherTest.java    | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index eef8328..4fa99dd 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -41,13 +41,14 @@ import static org.junit.Assert.assertTrue;
  * Unit test for {@link SplitFetcher}.
  */
 public class SplitFetcherTest {
-       private static final int NUM_SPLITS = 3;
-       private static final int NUM_RECORDS_PER_SPLIT = 10_000;
-       private static final int INTERRUPT_RECORDS_INTERVAL = 10;
-       private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * 
NUM_SPLITS;
 
        @Test
        public void testWakeup() throws InterruptedException {
+               final int numSplits = 3;
+               final int numRecordsPerSplit = 10_000;
+               final int interruptRecordsInterval = 10;
+               final int numTotalRecords = numRecordsPerSplit * numSplits;
+
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementQueue =
                        new FutureCompletingBlockingQueue<>(new 
FutureNotifier(), 1);
                SplitFetcher<int[], MockSourceSplit> fetcher =
@@ -59,10 +60,10 @@ public class SplitFetcherTest {
 
                // Prepare the splits.
                List<MockSourceSplit> splits = new ArrayList<>();
-               for (int i = 0; i < NUM_SPLITS; i++) {
-                       splits.add(new MockSourceSplit(i, 0, 
NUM_RECORDS_PER_SPLIT));
-                       int base = i * NUM_RECORDS_PER_SPLIT;
-                       for (int j = base; j < base + NUM_RECORDS_PER_SPLIT; 
j++) {
+               for (int i = 0; i < numSplits; i++) {
+                       splits.add(new MockSourceSplit(i, 0, 
numRecordsPerSplit));
+                       int base = i * numRecordsPerSplit;
+                       for (int j = base; j < base + numRecordsPerSplit; j++) {
                                splits.get(splits.size() - 1).addRecord(j);
                        }
                }
@@ -81,9 +82,9 @@ public class SplitFetcherTest {
                        @Override
                        public void run() {
                                int lastInterrupt = 0;
-                               while (recordsRead.size() < NUM_TOTAL_RECORDS 
&& !stop.get()) {
+                               while (recordsRead.size() < numTotalRecords && 
!stop.get()) {
                                        int numRecordsRead = recordsRead.size();
-                                       if (numRecordsRead >= lastInterrupt + 
INTERRUPT_RECORDS_INTERVAL) {
+                                       if (numRecordsRead >= lastInterrupt + 
interruptRecordsInterval) {
                                                fetcher.wakeUp(false);
                                                wakeupTimes.incrementAndGet();
                                                lastInterrupt = numRecordsRead;
@@ -96,7 +97,7 @@ public class SplitFetcherTest {
                        fetcherThread.start();
                        interrupter.start();
 
-                       while (recordsRead.size() < NUM_SPLITS * 
NUM_RECORDS_PER_SPLIT) {
+                       while (recordsRead.size() < numSplits * 
numRecordsPerSplit) {
                                final RecordsWithSplitIds<int[]> nextBatch = 
elementQueue.take();
                                while (nextBatch.nextSplit() != null) {
                                        int[] arr;
@@ -106,9 +107,9 @@ public class SplitFetcherTest {
                                }
                        }
 
-                       assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
+                       assertEquals(numTotalRecords, recordsRead.size());
                        assertEquals(0, (int) recordsRead.first());
-                       assertEquals(NUM_TOTAL_RECORDS - 1, (int) 
recordsRead.last());
+                       assertEquals(numTotalRecords - 1, (int) 
recordsRead.last());
                        assertTrue(wakeupTimes.get() > 0);
                } finally {
                        stop.set(true);

Reply via email to