This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8fb2c3a4cf3a37b87403e462f675bb5d614a4b51 Author: Arvid Heise <[email protected]> AuthorDate: Tue Sep 7 22:23:15 2021 +0200 [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint. --- .../connectors/kinesis/FlinkKinesisITCase.java | 141 +++++++++++++++------ .../kinesis/internals/KinesisDataFetcherTest.java | 11 +- 2 files changed, 105 insertions(+), 47 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java index 6a394860450..705be463475 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java @@ -19,44 +19,53 @@ package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient; import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; import org.apache.flink.util.DockerImageVersions; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestNameProvider; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.utility.DockerImageName; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; /** IT cases for using Kinesis consumer/producer based on Kinesalite. */ -@Ignore("See FLINK-23528") public class FlinkKinesisITCase extends TestLogger { - public static final String TEST_STREAM = "test_stream"; + private String stream; + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class); @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER = @@ -69,20 +78,34 @@ public class FlinkKinesisITCase extends TestLogger { @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Rule public SharedObjects sharedObjects = SharedObjects.create(); + private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); private KinesisPubsubClient client; @Before - public void setupClient() { + public void setupClient() throws Exception { client = new KinesisPubsubClient(kinesalite.getContainerProperties()); + stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", ""); + client.createTopic(stream, 1, new Properties()); + } + + @Test + public void testStopWithSavepoint() throws Exception { + testStopWithSavepoint(false); + } + + @Test + public void testStopWithSavepointWithDrain() throws Exception { + testStopWithSavepoint(true); } /** * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170). * * <ol> - * <li>The test setups up a stream with 100 records and creates a Flink job that reads them + * <li>The test setups up a stream with 1000 records and creates a Flink job that reads them * with very slowly (using up a large chunk of time of the mailbox). * <li>After ensuring that consumption has started, the job is stopped in a parallel thread. * <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during @@ -90,83 +113,117 @@ public class FlinkKinesisITCase extends TestLogger { * <li>With the fix, the job proceeds and we can lift the backpressure. * </ol> */ - @Test - public void testStopWithSavepoint() throws Exception { - client.createTopic(TEST_STREAM, 1, new Properties()); - + private void testStopWithSavepoint(boolean drain) throws Exception { // add elements to the test stream int numElements = 1000; client.sendMessage( - TEST_STREAM, + stream, IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + env.enableCheckpointing(100L); Properties config = kinesalite.getContainerProperties(); config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name()); FlinkKinesisConsumer<String> consumer = - new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config); + new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config); - DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper()); + SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1)); + DataStream<String> stream = + env.addSource(consumer).map(new WaitingMapper(savepointTrigger)); // call stop with savepoint in another thread ForkJoinTask<Object> stopTask = ForkJoinPool.commonPool() .submit( () -> { - WaitingMapper.firstElement.await(); - stopWithSavepoint(); - WaitingMapper.stopped = true; + savepointTrigger.get().await(); + stopWithSavepoint(drain); return null; }); try { List<String> result = stream.executeAndCollect(10000); // stop with savepoint will most likely only return a small subset of the elements // validate that the prefix is as expected - assertThat(result, hasSize(lessThan(numElements))); - assertThat( - result, - equalTo( - IntStream.range(0, numElements) - .mapToObj(String::valueOf) - .collect(Collectors.toList()) - .subList(0, result.size()))); + if (drain) { + assertThat( + result, + contains( + IntStream.range(0, numElements) + .mapToObj(String::valueOf) + .toArray())); + } else { + // stop with savepoint will most likely only return a small subset of the elements + // validate that the prefix is as expected + assertThat( + result, + contains( + IntStream.range(0, result.size()) + .mapToObj(String::valueOf) + .toArray())); + } } finally { - stopTask.cancel(true); + stopTask.get(); } } - private String stopWithSavepoint() throws Exception { + private String stopWithSavepoint(boolean drain) throws Exception { JobStatusMessage job = MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get(); return MINI_CLUSTER .getClusterClient() .stopWithSavepoint( job.getJobId(), - true, + drain, temp.getRoot().getAbsolutePath(), SavepointFormatType.CANONICAL) .get(); } - private static class WaitingMapper implements MapFunction<String, String> { - static CountDownLatch firstElement; - static volatile boolean stopped; + private static class WaitingMapper + implements MapFunction<String, String>, CheckpointedFunction { + private final SharedReference<CountDownLatch> savepointTrigger; + private volatile boolean savepointTriggered; + // keep track on when the last checkpoint occurred + private transient Deadline checkpointDeadline; + private final AtomicInteger numElements = new AtomicInteger(); + + WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) { + this.savepointTrigger = savepointTrigger; + checkpointDeadline = Deadline.fromNow(Duration.ofDays(1)); + } - WaitingMapper() { - firstElement = new CountDownLatch(1); - stopped = false; + private void readObject(ObjectInputStream stream) + throws ClassNotFoundException, IOException { + stream.defaultReadObject(); + checkpointDeadline = Deadline.fromNow(Duration.ofDays(1)); } @Override public String map(String value) throws Exception { - if (firstElement.getCount() > 0) { - firstElement.countDown(); - } - if (!stopped) { - Thread.sleep(100); + numElements.incrementAndGet(); + if (!savepointTriggered) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + savepointTriggered = checkpointDeadline.isOverdue(); } return value; } + + @Override + public void snapshotState(FunctionSnapshotContext context) { + // assume that after the first savepoint, this function will only see new checkpoint + // when the final savepoint is triggered + if (numElements.get() > 0) { + this.checkpointDeadline = Deadline.fromNow(Duration.ofSeconds(1)); + savepointTrigger.get().countDown(); + } + LOG.info("snapshotState {} {}", context.getCheckpointId(), numElements); + } + + @Override + public void initializeState(FunctionInitializationContext context) {} } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 8be2bb41097..9f6d8e3f27f 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -17,11 +17,6 @@ package org.apache.flink.streaming.connectors.kinesis.internals; -import com.amazonaws.services.kinesis.model.HashKeyRange; -import com.amazonaws.services.kinesis.model.SequenceNumberRange; -import com.amazonaws.services.kinesis.model.Shard; -import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.commons.lang3.mutable.MutableLong; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.CheckedThread; @@ -49,6 +44,12 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; + +import com.amazonaws.services.kinesis.model.HashKeyRange; +import com.amazonaws.services.kinesis.model.SequenceNumberRange; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableLong; import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox;
