This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9ee0fe3 [FLINK-30224][Connectors/Kinesis] Added an IT test for slow
FlinKinesisConsumer's run which caused NPE in close (#41)
9ee0fe3 is described below
commit 9ee0fe32d1a9e6d62e514824ab553956fe88ee9d
Author: Astamur Kirillin <[email protected]>
AuthorDate: Wed Dec 21 10:32:38 2022 +0000
[FLINK-30224][Connectors/Kinesis] Added an IT test for slow
FlinKinesisConsumer's run which caused NPE in close (#41)
Co-authored-by: Astamur Kirillin <[email protected]>
---
.../connectors/kinesis/FlinkKinesisITCase.java | 99 +++++++++++++++++++++-
1 file changed, 97 insertions(+), 2 deletions(-)
diff --git
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 7f7676a..6066d82 100644
---
a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++
b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
@@ -29,7 +30,10 @@ import
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
@@ -38,6 +42,7 @@ import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TestNameProvider;
+import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@@ -52,9 +57,12 @@ import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -63,6 +71,8 @@ import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
import static
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertTrue;
/** IT cases for using Kinesis consumer/producer based on Kinesalite. */
public class FlinkKinesisITCase extends TestLogger {
@@ -115,6 +125,40 @@ public class FlinkKinesisITCase extends TestLogger {
testStopWithSavepoint(true, true);
}
+ /**
+ * Tests stop with savepoint while {@link
+ * FlinkKinesisConsumer#run(SourceFunction.SourceContext)}} with a slow
run method is still
+ * hasn't finished run method and hasn't set a {@link KinesisDataFetcher}
yet.
+ */
+ @Test
+ public void testStopWithSavepointWithSlowConsumer() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100L);
+
+ SharedReference<CyclicBarrier> savepointTrigger =
sharedObjects.add(new CyclicBarrier(2));
+ env.addSource(createSlowKinesisConsumer(savepointTrigger)).addSink(new
DiscardingSink<>());
+
+ ForkJoinTask<String> stopTask =
+ ForkJoinPool.commonPool()
+ .submit(
+ () -> {
+ // Wait until run method is reached in the
consumer
+ savepointTrigger.get().await();
+ String result =
stopWithSavepoint(savepointTrigger);
+ log.info("StopWithSavepoint result: {}",
result);
+ return result;
+ });
+
+ env.execute();
+ String result = stopTask.get(1, TimeUnit.MINUTES);
+
+ MatcherAssert.assertThat("Savepoint result shouldn't be null", result,
notNullValue());
+ assertTrue(
+ "Unexpected savepoint file's name format",
+ result.matches("^file:.*/savepoint-.*$"));
+ }
+
/**
* Tests that pending elements do not cause a deadlock during stop with
savepoint (FLINK-17170).
*
@@ -183,21 +227,42 @@ public class FlinkKinesisITCase extends TestLogger {
return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
}
+ private FlinkKinesisConsumer<String> createSlowKinesisConsumer(
+ SharedReference<CyclicBarrier> savepointTrigger) {
+ Properties config = getContainerProperties();
+ config.setProperty(STREAM_INITIAL_POSITION,
InitialPosition.TRIM_HORIZON.name());
+ return new SlowFlinkKinesisConsumer<>(stream, STRING_SCHEMA, config,
savepointTrigger);
+ }
+
private Properties getContainerProperties() {
return kinesalite.getContainerProperties();
}
private String stopWithSavepoint(boolean drain) throws Exception {
+ return callStopWithSavepoint(drain).get();
+ }
+
+ private String stopWithSavepoint(SharedReference<CyclicBarrier>
savepointTrigger)
+ throws Exception {
+ CompletableFuture<String> resultFuture = callStopWithSavepoint(false);
+ // Release barrier in consumer after stop was called
+ savepointTrigger.get().await();
+
+ return resultFuture.get();
+ }
+
+ private CompletableFuture<String> callStopWithSavepoint(boolean drain)
throws Exception {
JobStatusMessage job =
miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+
+ log.info("Calling stopWithSavepoint: {}", job.getJobId());
return miniCluster
.getClusterClient()
.stopWithSavepoint(
job.getJobId(),
drain,
temp.getRoot().getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
+ SavepointFormatType.CANONICAL);
}
private static class WaitingMapper
@@ -248,4 +313,34 @@ public class FlinkKinesisITCase extends TestLogger {
@Override
public void initializeState(FunctionInitializationContext context) {}
}
+
+ /**
+ * A simple implementation of {@link FlinkKinesisConsumer} with a slow
run() method's execution
+ * which is controlled by incoming {@link CyclicBarrier} from the test.
+ */
+ private static class SlowFlinkKinesisConsumer<T> extends
FlinkKinesisConsumer<T> {
+ private final SharedReference<CyclicBarrier> savepointTrigger;
+
+ public SlowFlinkKinesisConsumer(
+ String stream,
+ DeserializationSchema<T> deserializer,
+ Properties configProps,
+ SharedReference<CyclicBarrier> savepointTrigger) {
+ super(stream, deserializer, configProps);
+ this.savepointTrigger = savepointTrigger;
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ // Wait until it's allowed to do a stop-with-savepoint operation
+ // (a job and a task are both in RUNNING state)
+ savepointTrigger.get().await();
+
+ // Start a new waiting cycle and imitate a slow run operation
until stop is called
+ savepointTrigger.get().reset();
+ savepointTrigger.get().await();
+
+ super.run(sourceContext);
+ }
+ }
}