This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cd5fe93035c5d875d10d9f3cb1d84d4ed045dc0 Author: Krzysztof Dziolak <[email protected]> AuthorDate: Fri Jul 1 10:08:45 2022 +0100 [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode --- .../kinesis/internals/KinesisDataFetcher.java | 7 ++- .../publisher/fanout/FanOutShardSubscriber.java | 3 +- .../connectors/kinesis/FlinkKinesisITCase.java | 68 +++++++++++++++------- .../fanout/FanOutShardSubscriberTest.java | 20 +++++++ 4 files changed, 74 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index e12bdaa0b3a..4fcc80a250e 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> { LOG.warn("Encountered exception closing record publisher factory", e); } } finally { - shardConsumersExecutor.shutdown(); + gracefulShutdownShardConsumers(); cancelFuture.complete(null); @@ -852,6 +852,11 @@ public class KinesisDataFetcher<T> { StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams); } + /** Gracefully stops shardConsumersExecutor without interrupting running threads. */ + private void gracefulShutdownShardConsumers() { + shardConsumersExecutor.shutdown(); + } + /** * Returns a flag indicating if this fetcher is running. * diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java index afda248773c..a280a8f6537 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java @@ -361,8 +361,7 @@ public class FanOutShardSubscriber { } } else if (subscriptionEvent.isSubscriptionComplete()) { // The subscription is complete, but the shard might not be, so we return incomplete - result = false; - break; + return false; } else { handleError(subscriptionEvent.getThrowable()); result = false; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java index 705be463475..ee8066a61aa 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java @@ -39,7 +39,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestNameProvider; import org.junit.Before; -import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -58,6 +58,8 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger { private String stream; private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class); - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER = + @Rule + public final MiniClusterWithClientResource miniCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder().build()); - @ClassRule - public static KinesaliteContainer kinesalite = + @Rule + public KinesaliteContainer kinesalite = new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE)); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger { @Before public void setupClient() throws Exception { - client = new KinesisPubsubClient(kinesalite.getContainerProperties()); + client = new KinesisPubsubClient(getContainerProperties()); stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", ""); client.createTopic(stream, 1, new Properties()); } @Test public void testStopWithSavepoint() throws Exception { - testStopWithSavepoint(false); + testStopWithSavepoint(false, false); } @Test public void testStopWithSavepointWithDrain() throws Exception { - testStopWithSavepoint(true); + testStopWithSavepoint(true, false); + } + + @Test + @Ignore("Kinesalite does not support EFO") + public void testStopWithSavepointWithEfo() throws Exception { + testStopWithSavepoint(false, true); + } + + @Test + @Ignore("Kinesalite does not support EFO") + public void testStopWithSavepointWithDrainAndEfo() throws Exception { + testStopWithSavepoint(true, true); } /** @@ -113,7 +127,7 @@ public class FlinkKinesisITCase extends TestLogger { * <li>With the fix, the job proceeds and we can lift the backpressure. * </ol> */ - private void testStopWithSavepoint(boolean drain) throws Exception { + private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception { // add elements to the test stream int numElements = 1000; client.sendMessage( @@ -124,14 +138,10 @@ public class FlinkKinesisITCase extends TestLogger { env.setParallelism(1); env.enableCheckpointing(100L); - Properties config = kinesalite.getContainerProperties(); - config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name()); - FlinkKinesisConsumer<String> consumer = - new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config); - SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1)); - DataStream<String> stream = - env.addSource(consumer).map(new WaitingMapper(savepointTrigger)); + DataStream<String> outputStream = + env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger)); + // call stop with savepoint in another thread ForkJoinTask<Object> stopTask = ForkJoinPool.commonPool() @@ -142,7 +152,7 @@ public class FlinkKinesisITCase extends TestLogger { return null; }); try { - List<String> result = stream.executeAndCollect(10000); + List<String> result = outputStream.executeAndCollect(10000); // stop with savepoint will most likely only return a small subset of the elements // validate that the prefix is as expected if (drain) { @@ -167,10 +177,24 @@ public class FlinkKinesisITCase extends TestLogger { } } + private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) { + Properties config = getContainerProperties(); + config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name()); + if (efo) { + config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); + config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app"); + } + return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config); + } + + private Properties getContainerProperties() { + return kinesalite.getContainerProperties(); + } + private String stopWithSavepoint(boolean drain) throws Exception { JobStatusMessage job = - MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get(); - return MINI_CLUSTER + miniCluster.getClusterClient().listJobs().get().stream().findFirst().get(); + return miniCluster .getClusterClient() .stopWithSavepoint( job.getJobId(), @@ -190,13 +214,15 @@ public class FlinkKinesisITCase extends TestLogger { WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) { this.savepointTrigger = savepointTrigger; - checkpointDeadline = Deadline.fromNow(Duration.ofDays(1)); + // effectively set 1 hour timeout on the wait + // this is reduced to 1 second once the data starts flowing + checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10)); } private void readObject(ObjectInputStream stream) throws ClassNotFoundException, IOException { stream.defaultReadObject(); - checkpointDeadline = Deadline.fromNow(Duration.ofDays(1)); + checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10)); } @Override diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java index a41e7026984..9c711230096 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.model.StartingPosition; import java.time.Duration; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT; +import static org.junit.Assert.assertFalse; /** Tests for {@link FanOutShardSubscriber}. */ public class FanOutShardSubscriberTest { @@ -120,6 +121,25 @@ public class FanOutShardSubscriberTest { subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {}); } + @Test + public void testSubscriptionCompletion() throws Exception { + FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 = + FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord(); + + FanOutShardSubscriber subscriber = + new FanOutShardSubscriber( + "consumerArn", + "shardId", + errorKinesisV2, + DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT); + + StartingPosition startingPosition = StartingPosition.builder().build(); + boolean result = + subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {}); + + assertFalse(result); + } + @Test public void testTimeoutSubscribingToShard() throws Exception { thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
