dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r468867578
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
*/
@Internal
public class ShardConsumer<T> implements Runnable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ShardConsumer.class);
-
- // AWS Kinesis has a read limit of 2 Mb/sec
- //
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
- private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 *
1024L * 1024L;
-
private final KinesisDeserializationSchema<T> deserializer;
- private final KinesisProxyInterface kinesis;
-
private final int subscribedShardStateIndex;
private final KinesisDataFetcher<T> fetcherRef;
private final StreamShardHandle subscribedShard;
- private int maxNumberOfRecordsPerFetch;
- private final long fetchIntervalMillis;
- private final boolean useAdaptiveReads;
+ private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
- private final ShardMetricsReporter shardMetricsReporter;
+ private StartingPosition startingPosition;
private SequenceNumber lastSequenceNum;
- private Date initTimestamp;
+ private final RecordPublisher recordPublisher;
/**
* Creates a shard consumer.
*
* @param fetcherRef reference to the owning fetcher
+ * @param recordPublisher the record publisher used to read records
from kinesis
* @param subscribedShardStateIndex the state index of the shard this
consumer is subscribed to
* @param subscribedShard the shard this consumer is subscribed to
* @param lastSequenceNum the sequence number in the shard to start
consuming
- * @param kinesis the proxy instance to interact with Kinesis
- * @param shardMetricsReporter the reporter to report metrics to
+ * @param shardConsumerMetricsReporter the reporter to report metrics to
+ * @param shardDeserializer used to deserialize incoming records
*/
public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+ RecordPublisher recordPublisher,
Integer
subscribedShardStateIndex,
StreamShardHandle
subscribedShard,
SequenceNumber lastSequenceNum,
- KinesisProxyInterface kinesis,
- ShardMetricsReporter
shardMetricsReporter,
+ ShardConsumerMetricsReporter
shardConsumerMetricsReporter,
KinesisDeserializationSchema<T>
shardDeserializer) {
this.fetcherRef = checkNotNull(fetcherRef);
+ this.recordPublisher = checkNotNull(recordPublisher);
this.subscribedShardStateIndex =
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
+ this.shardConsumerMetricsReporter =
checkNotNull(shardConsumerMetricsReporter);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
- this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
checkArgument(
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
"Should not start a ShardConsumer if the shard has
already been completely read.");
this.deserializer = shardDeserializer;
Properties consumerConfig =
fetcherRef.getConsumerConfiguration();
- this.kinesis = kinesis;
- this.maxNumberOfRecordsPerFetch =
Integer.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
- this.fetchIntervalMillis =
Long.valueOf(consumerConfig.getProperty(
-
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
- this.useAdaptiveReads =
Boolean.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-
Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
- if
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
+
+ if
(lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+ Date initTimestamp;
String timestamp =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
try {
String format =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
SimpleDateFormat customDateFormat = new
SimpleDateFormat(format);
- this.initTimestamp =
customDateFormat.parse(timestamp);
+ initTimestamp =
customDateFormat.parse(timestamp);
} catch (IllegalArgumentException |
NullPointerException exception) {
throw new IllegalArgumentException(exception);
} catch (ParseException exception) {
- this.initTimestamp = new Date((long)
(Double.parseDouble(timestamp) * 1000));
+ initTimestamp = new Date((long)
(Double.parseDouble(timestamp) * 1000));
}
- } else {
- this.initTimestamp = null;
- }
- }
-
- /**
- * Returns a shard iterator for the given {@link SequenceNumber}.
- *
- * @return shard iterator
- * @throws Exception
- */
- protected String getShardIterator(SequenceNumber sequenceNumber) throws
Exception {
- if (isSentinelSequenceNumber(sequenceNumber)) {
- return getShardIteratorForSentinel(sequenceNumber);
+ startingPosition =
StartingPosition.fromTimestamp(initTimestamp);
} else {
- // we will be starting from an actual sequence number
(due to restore from failure).
- return
getShardIteratorForRealSequenceNumber(sequenceNumber);
+ startingPosition =
StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
}
}
- protected String getShardIteratorForSentinel(SequenceNumber
sentinelSequenceNumber) throws InterruptedException {
Review comment:
👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]