xiaolong-sn commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r454251135



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyMarker.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A marker interface for generic Kinesis Proxy.
+ */
+@Internal

Review comment:
       What does this interface represent besides the existed 
KinesisProxyInterface?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -379,49 +205,18 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
        }
 
        /**
-        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
-        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
-        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
-        * be used for the next call to this method.
-        *
-        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
-        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
-        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
-        * incorrect shard iteration if the iterator had to be refreshed.
+        * Filters out aggregated records that have previously been processed.
+        * This method is to support restarting from a partially consumed 
aggregated sequence number.
         *
-        * @param shardItr shard iterator to use
-        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
-        * @return get records result
-        * @throws InterruptedException
+        * @param record the record to filter
+        * @return {@code true} if the record should be retained
         */
-       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws Exception {
-               GetRecordsResult getRecordsResult = null;
-               while (getRecordsResult == null) {
-                       try {
-                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
-
-                               // Update millis behind latest so it gets 
reported by the millisBehindLatest gauge
-                               Long millisBehindLatest = 
getRecordsResult.getMillisBehindLatest();
-                               if (millisBehindLatest != null) {
-                                       
shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
-                               }
-                       } catch (ExpiredIteratorException eiEx) {
-                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
-                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
-
-                               shardItr = getShardIterator(lastSequenceNum);
-
-                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
-                               if (fetchIntervalMillis != 0) {
-                                       Thread.sleep(fetchIntervalMillis);
-                               }
-                       }
+       private boolean filterDeaggregatedRecord(final UserRecord record) {
+               if (lastSequenceNum.isAggregated()) {

Review comment:
       What does this function means? 

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward 
them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a 
ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+       private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+       private final KinesisProxyInterface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private String nextShardItr;
+
+       private final int maxNumberOfRecordsPerFetch;
+
+       private final long expiredIteratorBackoffMillis;
+
+       /**
+        * A Polling implementation of {@link RecordPublisher} that polls 
kinesis for records.
+        * The following KDS services are used: GetRecords and GetShardIterator.
+        *
+        * @param subscribedShard the shard in which to consume from
+        * @param metricsReporter a metric reporter used to output metrics
+        * @param kinesisProxy the proxy used to communicate with kinesis
+        * @param maxNumberOfRecordsPerFetch the maximum number of records to 
retrieve per batch
+        * @param expiredIteratorBackoffMillis the duration to sleep in the 
event of an {@link ExpiredIteratorException}
+        */
+       public PollingRecordPublisher(
+                       final StreamShardHandle subscribedShard,
+                       final PollingRecordPublisherMetricsReporter 
metricsReporter,
+                       final KinesisProxyInterface kinesisProxy,
+                       final int maxNumberOfRecordsPerFetch,
+                       final long expiredIteratorBackoffMillis) {
+               this.subscribedShard = subscribedShard;
+               this.metricsReporter = metricsReporter;
+               this.kinesisProxy = kinesisProxy;
+               this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+               this.expiredIteratorBackoffMillis = 
expiredIteratorBackoffMillis;
+       }
+
+       @Override
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer) throws 
InterruptedException {
+               return run(startingPosition, consumer, 
maxNumberOfRecordsPerFetch);
+       }
+
+       public RecordPublisherRunResult run(final StartingPosition 
startingPosition, final Consumer<RecordBatch> consumer, int maxNumberOfRecords) 
throws InterruptedException {
+               if (nextShardItr == null) {
+                       nextShardItr = getShardIterator(startingPosition);
+               }
+
+               if (nextShardItr == null) {
+                       return COMPLETE;
+               }
+
+               
metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+               GetRecordsResult result = getRecords(nextShardItr, 
startingPosition, maxNumberOfRecords);
+
+               consumer.accept(new RecordBatch(result.getRecords(), 
subscribedShard, result.getMillisBehindLatest()));
+
+               nextShardItr = result.getNextShardIterator();
+               return nextShardItr == null ? COMPLETE : INCOMPLETE;
+       }
+
+       /**
+        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
+        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
+        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
+        * be used for the next call to this method.
+        *
+        * <p>Note: it is important that this method is not called again before 
all the records from the last result have been
+        * fully collected with {@code 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
+        * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
+        * incorrect shard iteration if the iterator had to be refreshed.
+        *
+        * @param shardItr shard iterator to use
+        * @param startingPosition the position in the stream in which to 
consume from
+        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
+        * @return get records result
+        */
+       private GetRecordsResult getRecords(String shardItr, final 
StartingPosition startingPosition, int maxNumberOfRecords) throws 
InterruptedException {
+               GetRecordsResult getRecordsResult = null;
+               while (getRecordsResult == null) {
+                       try {
+                               getRecordsResult = 
kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+                       } catch (ExpiredIteratorException | 
InterruptedException eiEx) {

Review comment:
       Should we also consider other recoverable exceptions like 
`ProvisionedThroughputExceededException` here?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
##########
@@ -0,0 +1,106 @@
+/*

Review comment:
       This class looks quite like the `SentinelSequenceNumber ` class, is 
there any possibility to combine those two classes?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to