This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a96c10452c81a79f967b2a2607468b440a95c1a Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Feb 1 14:25:29 2022 +0100 [hotfix][test] Fix SourceOperatorTest#testSameAvailabilityFuture Previously this test was returning always CompletedFuture, which technically speaking should have been converted to AVAILABLE. And it also wasn't testing the important behaviour, that when being unavailable for long period of time, getAvailableFuture shouldn't be producing new CompletableFuture per each call. --- .../api/operators/SourceOperatorIdleTest.java | 69 ++++++++++++++++++++++ .../api/operators/SourceOperatorTest.java | 12 ---- .../api/operators/SourceOperatorTestContext.java | 8 ++- 3 files changed, 75 insertions(+), 14 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java new file mode 100644 index 0000000..ab42128 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorIdleTest.java @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; + +/** Unit test for idle {@link SourceOperator}. */ +@SuppressWarnings("serial") +public class SourceOperatorIdleTest { + + @Nullable private SourceOperatorTestContext context; + @Nullable private SourceOperator<Integer, MockSourceSplit> operator; + + @Before + public void setup() throws Exception { + context = new SourceOperatorTestContext(); + operator = context.getOperator(); + } + + @After + public void tearDown() throws Exception { + context.close(); + context = null; + operator = null; + } + + @Test + public void testSameAvailabilityFuture() throws Exception { + operator.initializeState(context.createStateContext()); + operator.open(); + operator.emitNext(new CollectingDataOutput<>()); + final CompletableFuture<?> initialFuture = operator.getAvailableFuture(); + assertFalse(initialFuture.isDone()); + final CompletableFuture<?> secondFuture = operator.getAvailableFuture(); + assertThat(initialFuture, not(sameInstance(AvailabilityProvider.AVAILABLE))); + assertThat(secondFuture, sameInstance(initialFuture)); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index c45492b..82e56d7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; -import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.AddSplitEvent; @@ -45,9 +44,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -195,12 +191,4 @@ public class SourceOperatorTest { operator.notifyCheckpointAborted(100L); assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); } - - @Test - public void testSameAvailabilityFuture() { - final CompletableFuture<?> initialFuture = operator.getAvailableFuture(); - final CompletableFuture<?> secondFuture = operator.getAvailableFuture(); - assertThat(initialFuture, not(sameInstance(AvailabilityProvider.AVAILABLE))); - assertThat(secondFuture, sameInstance(initialFuture)); - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index f8e63f2..d87fc27 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -47,7 +47,7 @@ import java.util.Collections; import static org.apache.flink.util.Preconditions.checkState; -/** Base class for testing {@link SourceOperator}. */ +/** Helper class for testing {@link SourceOperator}. */ @SuppressWarnings("serial") public class SourceOperatorTestContext implements AutoCloseable { @@ -59,7 +59,11 @@ public class SourceOperatorTestContext implements AutoCloseable { private SourceOperator<Integer, MockSourceSplit> operator; public SourceOperatorTestContext() throws Exception { - mockSourceReader = new MockSourceReader(); + this(false); + } + + public SourceOperatorTestContext(boolean idle) throws Exception { + mockSourceReader = new MockSourceReader(idle, idle); mockGateway = new MockOperatorEventGateway(); operator = new TestingSourceOperator<>(
