[ https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565298#comment-16565298 ]
ASF GitHub Bot commented on FLINK-9926: --------------------------------------- asfgit closed pull request #6427: [FLINK-9926][Kinesis Connector] Allow for ShardConsumer override in Kinesis consumer. URL: https://github.com/apache/flink/pull/6427 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 65de24c23d3..13de0324ccf 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -163,6 +163,9 @@ /** Reference to the first error thrown by any of the {@link ShardConsumer} threads. */ private final AtomicReference<Throwable> error; + /** The Kinesis proxy factory that will be used to create instances for discovery and shard consumers. */ + private final FlinkKinesisProxyFactory kinesisProxyFactory; + /** The Kinesis proxy that the fetcher will be using to discover new shards. */ private final KinesisProxyInterface kinesis; @@ -179,6 +182,13 @@ private volatile boolean running = true; + /** + * Factory to create Kinesis proxy instances used by a fetcher. + */ + public interface FlinkKinesisProxyFactory { + KinesisProxyInterface create(Properties configProps); + } + /** * Creates a Kinesis Data Fetcher. * @@ -204,7 +214,7 @@ public KinesisDataFetcher(List<String> streams, new AtomicReference<>(), new ArrayList<>(), createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), - KinesisProxy.create(configProps)); + KinesisProxy::create); } @VisibleForTesting @@ -218,7 +228,7 @@ protected KinesisDataFetcher(List<String> streams, AtomicReference<Throwable> error, List<KinesisStreamShardState> subscribedShardsState, HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, - KinesisProxyInterface kinesis) { + FlinkKinesisProxyFactory kinesisProxyFactory) { this.streams = checkNotNull(streams); this.configProps = checkNotNull(configProps); this.sourceContext = checkNotNull(sourceContext); @@ -228,7 +238,8 @@ protected KinesisDataFetcher(List<String> streams, this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); this.deserializationSchema = checkNotNull(deserializationSchema); this.shardAssigner = checkNotNull(shardAssigner); - this.kinesis = checkNotNull(kinesis); + this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory); + this.kinesis = kinesisProxyFactory.create(configProps); this.consumerMetricGroup = runtimeContext.getMetricGroup() .addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP); @@ -241,6 +252,29 @@ protected KinesisDataFetcher(List<String> streams, createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); } + /** + * Create a new shard consumer. + * Override this method to customize shard consumer behavior in subclasses. + * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to + * @param subscribedShard the shard this consumer is subscribed to + * @param lastSequenceNum the sequence number in the shard to start consuming + * @param shardMetricsReporter the reporter to report metrics to + * @return shard consumer + */ + protected ShardConsumer createShardConsumer( + Integer subscribedShardStateIndex, + StreamShardHandle subscribedShard, + SequenceNumber lastSequenceNum, + ShardMetricsReporter shardMetricsReporter) { + return new ShardConsumer<>( + this, + subscribedShardStateIndex, + subscribedShard, + lastSequenceNum, + this.kinesisProxyFactory.create(configProps), + shardMetricsReporter); + } + /** * Starts the fetcher. After starting the fetcher, it can only * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. @@ -297,8 +331,7 @@ public void runFetcher() throws Exception { } shardConsumersExecutor.submit( - new ShardConsumer<>( - this, + createShardConsumer( seededStateIndex, subscribedShardsState.get(seededStateIndex).getStreamShardHandle(), subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(), @@ -344,8 +377,7 @@ public void runFetcher() throws Exception { } shardConsumersExecutor.submit( - new ShardConsumer<>( - this, + createShardConsumer( newStateIndex, newShardState.getStreamShardHandle(), newShardState.getLastProcessedSequenceNum(), diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 77d180cc395..d563a5cf2bf 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -24,7 +24,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; @@ -87,28 +86,15 @@ * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming + * @param kinesis the proxy instance to interact with Kinesis * @param shardMetricsReporter the reporter to report metrics to */ public ShardConsumer(KinesisDataFetcher<T> fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, + KinesisProxyInterface kinesis, ShardMetricsReporter shardMetricsReporter) { - this(fetcherRef, - subscribedShardStateIndex, - subscribedShard, - lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration()), - shardMetricsReporter); - } - - /** This constructor is exposed for testing purposes. */ - protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, - Integer subscribedShardStateIndex, - StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis, - ShardMetricsReporter shardMetricsReporter) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); @@ -152,61 +138,70 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, } } - @SuppressWarnings("unchecked") - @Override - public void run() { + /** + * Find the initial shard iterator to start getting records from. + * @return shard iterator + * @throws Exception + */ + protected String getInitialShardIterator() throws Exception { String nextShardItr; - try { - // before infinitely looping, we set the initial nextShardItr appropriately + // before infinitely looping, we set the initial nextShardItr appropriately - if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { - // if the shard is already closed, there will be no latest next record to get for this shard - if (subscribedShard.isClosed()) { - nextShardItr = null; - } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); - } - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { + // if the shard is already closed, there will be no latest next record to get for this shard + if (subscribedShard.isClosed()) { nextShardItr = null; - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp); } else { - // we will be starting from an actual sequence number (due to restore from failure). - // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records - // from the last aggregated record; otherwise, we can simply start iterating from the record right after. - - if (lastSequenceNum.isAggregated()) { - String itrForLastAggregatedRecord = - kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); - - // get only the last aggregated record - GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); - - List<UserRecord> fetchedRecords = deaggregateRecords( - getRecordsResult.getRecords(), - subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), - subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); - - long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber(); - for (UserRecord record : fetchedRecords) { - // we have found a dangling sub-record if it has a larger subsequence number - // than our last sequence number; if so, collect the record and update state - if (record.getSubSequenceNumber() > lastSubSequenceNum) { - deserializeRecordForCollectionAndUpdateState(record); - } + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + } + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + nextShardItr = null; + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp); + } else { + // we will be starting from an actual sequence number (due to restore from failure). + // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records + // from the last aggregated record; otherwise, we can simply start iterating from the record right after. + + if (lastSequenceNum.isAggregated()) { + String itrForLastAggregatedRecord = + kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // get only the last aggregated record + GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); + + List<UserRecord> fetchedRecords = deaggregateRecords( + getRecordsResult.getRecords(), + subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), + subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + + long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber(); + for (UserRecord record : fetchedRecords) { + // we have found a dangling sub-record if it has a larger subsequence number + // than our last sequence number; if so, collect the record and update state + if (record.getSubSequenceNumber() > lastSubSequenceNum) { + deserializeRecordForCollectionAndUpdateState(record); } - - // set the nextShardItr so we can continue iterating in the next while loop - nextShardItr = getRecordsResult.getNextShardIterator(); - } else { - // the last record was non-aggregated, so we can simply start from the next record - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); } + + // set the nextShardItr so we can continue iterating in the next while loop + nextShardItr = getRecordsResult.getNextShardIterator(); + } else { + // the last record was non-aggregated, so we can simply start from the next record + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); } + } + return nextShardItr; + } + @SuppressWarnings("unchecked") + @Override + public void run() { + try { + String nextShardItr = getInitialShardIterator(); long lastTimeNanos = 0; while (isRunning()) { if (nextShardItr == null) { diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 9e5c6cbe450..e25a6015771 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -36,14 +36,14 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory; import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext; import com.fasterxml.jackson.databind.deser.DeserializerFactory; -import com.amazonaws.services.securitytoken.AWSSecurityTokenService; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import java.io.IOException; import java.util.HashMap; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java index 4dcab33bd1a..49a9ed71ae0 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java @@ -20,6 +20,9 @@ import org.apache.flink.annotation.Internal; +/** + * Internal use. + */ @Internal public class TimeoutLatch { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index b7cfb2d32d1..21588c9a7a7 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -72,7 +72,7 @@ public TestableKinesisDataFetcher( thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, - fakeKinesis); + (properties) -> fakeKinesis); this.runWaiter = new OneShotLatch(); this.initialDiscoveryWaiter = new OneShotLatch(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow for ShardConsumer override in Kinesis consumer > ---------------------------------------------------- > > Key: FLINK-9926 > URL: https://issues.apache.org/jira/browse/FLINK-9926 > Project: Flink > Issue Type: Task > Components: Kinesis Connector > Reporter: Thomas Weise > Assignee: Thomas Weise > Priority: Minor > Labels: pull-request-available > > There are various reasons why the user may want to override the consumer. > Examples are to optimize the run loop or to add additional metrics or > logging. Instead of baking the constructor into runFetcher, create a > customizable factory method. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)