xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453064



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties 
configProps) {
                SequenceNumber lastSequenceNum,
                MetricGroup metricGroup,
                KinesisDeserializationSchema<T> shardDeserializer) {
+               return createShardConsumer(
+                       subscribedShardStateIndex,
+                       subscribedShard,
+                       lastSequenceNum,
+                       metricGroup,
+                       shardDeserializer,
+                       null
+               );
+       }
+
+       /**
+        * Create a new shard consumer.
+        * Override this method to customize shard consumer behavior in 
subclasses.
+        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
+        * @param subscribedShard the shard this consumer is subscribed to
+        * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        * @param metricGroup the metric group to report metrics to
+        * @param streamInfoList the stream info used for enhanced fan-out to 
consume from
+        * @return shard consumer
+        */
+       protected ShardConsumer<T> createShardConsumer(
+               Integer subscribedShardStateIndex,
+               StreamShardHandle subscribedShard,
+               SequenceNumber lastSequenceNum,
+               MetricGroup metricGroup,
+               KinesisDeserializationSchema<T> shardDeserializer,
+               @Nullable List<FanOutStreamInfo> streamInfoList) {

Review comment:
       I reverted the change to ShardConsumer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to