[ 
https://issues.apache.org/jira/browse/FLINK-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565298#comment-16565298
 ] 

ASF GitHub Bot commented on FLINK-9926:
---------------------------------------

asfgit closed pull request #6427: [FLINK-9926][Kinesis Connector] Allow for 
ShardConsumer override in Kinesis consumer.
URL: https://github.com/apache/flink/pull/6427
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 65de24c23d3..13de0324ccf 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -163,6 +163,9 @@
        /** Reference to the first error thrown by any of the {@link 
ShardConsumer} threads. */
        private final AtomicReference<Throwable> error;
 
+       /** The Kinesis proxy factory that will be used to create instances for 
discovery and shard consumers. */
+       private final FlinkKinesisProxyFactory kinesisProxyFactory;
+
        /** The Kinesis proxy that the fetcher will be using to discover new 
shards. */
        private final KinesisProxyInterface kinesis;
 
@@ -179,6 +182,13 @@
 
        private volatile boolean running = true;
 
+       /**
+        * Factory to create Kinesis proxy instances used by a fetcher.
+        */
+       public interface FlinkKinesisProxyFactory {
+               KinesisProxyInterface create(Properties configProps);
+       }
+
        /**
         * Creates a Kinesis Data Fetcher.
         *
@@ -204,7 +214,7 @@ public KinesisDataFetcher(List<String> streams,
                        new AtomicReference<>(),
                        new ArrayList<>(),
                        
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-                       KinesisProxy.create(configProps));
+                       KinesisProxy::create);
        }
 
        @VisibleForTesting
@@ -218,7 +228,7 @@ protected KinesisDataFetcher(List<String> streams,
                                                                
AtomicReference<Throwable> error,
                                                                
List<KinesisStreamShardState> subscribedShardsState,
                                                                HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
-                                                               
KinesisProxyInterface kinesis) {
+                                                               
FlinkKinesisProxyFactory kinesisProxyFactory) {
                this.streams = checkNotNull(streams);
                this.configProps = checkNotNull(configProps);
                this.sourceContext = checkNotNull(sourceContext);
@@ -228,7 +238,8 @@ protected KinesisDataFetcher(List<String> streams,
                this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
                this.deserializationSchema = 
checkNotNull(deserializationSchema);
                this.shardAssigner = checkNotNull(shardAssigner);
-               this.kinesis = checkNotNull(kinesis);
+               this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
+               this.kinesis = kinesisProxyFactory.create(configProps);
 
                this.consumerMetricGroup = runtimeContext.getMetricGroup()
                        
.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
@@ -241,6 +252,29 @@ protected KinesisDataFetcher(List<String> streams,
                        
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
        }
 
+       /**
+        * Create a new shard consumer.
+        * Override this method to customize shard consumer behavior in 
subclasses.
+        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
+        * @param subscribedShard the shard this consumer is subscribed to
+        * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        * @param shardMetricsReporter the reporter to report metrics to
+        * @return shard consumer
+        */
+       protected ShardConsumer createShardConsumer(
+               Integer subscribedShardStateIndex,
+               StreamShardHandle subscribedShard,
+               SequenceNumber lastSequenceNum,
+               ShardMetricsReporter shardMetricsReporter) {
+               return new ShardConsumer<>(
+                       this,
+                       subscribedShardStateIndex,
+                       subscribedShard,
+                       lastSequenceNum,
+                       this.kinesisProxyFactory.create(configProps),
+                       shardMetricsReporter);
+       }
+
        /**
         * Starts the fetcher. After starting the fetcher, it can only
         * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
@@ -297,8 +331,7 @@ public void runFetcher() throws Exception {
                                        }
 
                                shardConsumersExecutor.submit(
-                                       new ShardConsumer<>(
-                                               this,
+                                       createShardConsumer(
                                                seededStateIndex,
                                                
subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
                                                
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
@@ -344,8 +377,7 @@ public void runFetcher() throws Exception {
                                }
 
                                shardConsumersExecutor.submit(
-                                       new ShardConsumer<>(
-                                               this,
+                                       createShardConsumer(
                                                newStateIndex,
                                                
newShardState.getStreamShardHandle(),
                                                
newShardState.getLastProcessedSequenceNum(),
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 77d180cc395..d563a5cf2bf 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -24,7 +24,6 @@
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
@@ -87,28 +86,15 @@
         * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
         * @param subscribedShard the shard this consumer is subscribed to
         * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        * @param kinesis the proxy instance to interact with Kinesis
         * @param shardMetricsReporter the reporter to report metrics to
         */
        public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
                                                Integer 
subscribedShardStateIndex,
                                                StreamShardHandle 
subscribedShard,
                                                SequenceNumber lastSequenceNum,
+                                               KinesisProxyInterface kinesis,
                                                ShardMetricsReporter 
shardMetricsReporter) {
-               this(fetcherRef,
-                       subscribedShardStateIndex,
-                       subscribedShard,
-                       lastSequenceNum,
-                       
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
-                       shardMetricsReporter);
-       }
-
-       /** This constructor is exposed for testing purposes. */
-       protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-                                                       Integer 
subscribedShardStateIndex,
-                                                       StreamShardHandle 
subscribedShard,
-                                                       SequenceNumber 
lastSequenceNum,
-                                                       KinesisProxyInterface 
kinesis,
-                                                       ShardMetricsReporter 
shardMetricsReporter) {
                this.fetcherRef = checkNotNull(fetcherRef);
                this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
                this.subscribedShard = checkNotNull(subscribedShard);
@@ -152,61 +138,70 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
                }
        }
 
-       @SuppressWarnings("unchecked")
-       @Override
-       public void run() {
+       /**
+        * Find the initial shard iterator to start getting records from.
+        * @return shard iterator
+        * @throws Exception
+        */
+       protected String getInitialShardIterator() throws Exception {
                String nextShardItr;
 
-               try {
-                       // before infinitely looping, we set the initial 
nextShardItr appropriately
+               // before infinitely looping, we set the initial nextShardItr 
appropriately
 
-                       if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
-                               // if the shard is already closed, there will 
be no latest next record to get for this shard
-                               if (subscribedShard.isClosed()) {
-                                       nextShardItr = null;
-                               } else {
-                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
-                               }
-                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
-                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
-                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+               if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
+                       // if the shard is already closed, there will be no 
latest next record to get for this shard
+                       if (subscribedShard.isClosed()) {
                                nextShardItr = null;
-                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
-                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
                        } else {
-                               // we will be starting from an actual sequence 
number (due to restore from failure).
-                               // if the last sequence number refers to an 
aggregated record, we need to clean up any dangling sub-records
-                               // from the last aggregated record; otherwise, 
we can simply start iterating from the record right after.
-
-                               if (lastSequenceNum.isAggregated()) {
-                                       String itrForLastAggregatedRecord =
-                                               
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
-
-                                       // get only the last aggregated record
-                                       GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
-
-                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               getRecordsResult.getRecords(),
-                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-                                       long lastSubSequenceNum = 
lastSequenceNum.getSubSequenceNumber();
-                                       for (UserRecord record : 
fetchedRecords) {
-                                               // we have found a dangling 
sub-record if it has a larger subsequence number
-                                               // than our last sequence 
number; if so, collect the record and update state
-                                               if 
(record.getSubSequenceNumber() > lastSubSequenceNum) {
-                                                       
deserializeRecordForCollectionAndUpdateState(record);
-                                               }
+                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
+                       }
+               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
+                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
+               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                       nextShardItr = null;
+               } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
+                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
+               } else {
+                       // we will be starting from an actual sequence number 
(due to restore from failure).
+                       // if the last sequence number refers to an aggregated 
record, we need to clean up any dangling sub-records
+                       // from the last aggregated record; otherwise, we can 
simply start iterating from the record right after.
+
+                       if (lastSequenceNum.isAggregated()) {
+                               String itrForLastAggregatedRecord =
+                                       
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
+
+                               // get only the last aggregated record
+                               GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
+
+                               List<UserRecord> fetchedRecords = 
deaggregateRecords(
+                                       getRecordsResult.getRecords(),
+                                       
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+                                       
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+                               long lastSubSequenceNum = 
lastSequenceNum.getSubSequenceNumber();
+                               for (UserRecord record : fetchedRecords) {
+                                       // we have found a dangling sub-record 
if it has a larger subsequence number
+                                       // than our last sequence number; if 
so, collect the record and update state
+                                       if (record.getSubSequenceNumber() > 
lastSubSequenceNum) {
+                                               
deserializeRecordForCollectionAndUpdateState(record);
                                        }
-
-                                       // set the nextShardItr so we can 
continue iterating in the next while loop
-                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
-                               } else {
-                                       // the last record was non-aggregated, 
so we can simply start from the next record
-                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
                                }
+
+                               // set the nextShardItr so we can continue 
iterating in the next while loop
+                               nextShardItr = 
getRecordsResult.getNextShardIterator();
+                       } else {
+                               // the last record was non-aggregated, so we 
can simply start from the next record
+                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
                        }
+               }
+               return nextShardItr;
+       }
 
+       @SuppressWarnings("unchecked")
+       @Override
+       public void run() {
+               try {
+                       String nextShardItr = getInitialShardIterator();
                        long lastTimeNanos = 0;
                        while (isRunning()) {
                                if (nextShardItr == null) {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 9e5c6cbe450..e25a6015771 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -36,14 +36,14 @@
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
 import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
 import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.DeserializerFactory;
-import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
-import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 
 import java.io.IOException;
 import java.util.HashMap;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
index 4dcab33bd1a..49a9ed71ae0 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
@@ -20,6 +20,9 @@
 
 import org.apache.flink.annotation.Internal;
 
+/**
+ * Internal use.
+ */
 @Internal
 public class TimeoutLatch {
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b7cfb2d32d1..21588c9a7a7 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -72,7 +72,7 @@ public TestableKinesisDataFetcher(
                        thrownErrorUnderTest,
                        subscribedShardsStateUnderTest,
                        subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-                       fakeKinesis);
+                       (properties) -> fakeKinesis);
 
                this.runWaiter = new OneShotLatch();
                this.initialDiscoveryWaiter = new OneShotLatch();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow for ShardConsumer override in Kinesis consumer
> ----------------------------------------------------
>
>                 Key: FLINK-9926
>                 URL: https://issues.apache.org/jira/browse/FLINK-9926
>             Project: Flink
>          Issue Type: Task
>          Components: Kinesis Connector
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Minor
>              Labels: pull-request-available
>
> There are various reasons why the user may want to override the consumer. 
> Examples are to optimize the run loop or to add additional metrics or 
> logging. Instead of baking the constructor into runFetcher, create a 
> customizable factory method.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to