dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r468867443
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
*/
@Internal
public class ShardConsumer<T> implements Runnable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ShardConsumer.class);
-
- // AWS Kinesis has a read limit of 2 Mb/sec
- //
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
- private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 *
1024L * 1024L;
-
private final KinesisDeserializationSchema<T> deserializer;
- private final KinesisProxyInterface kinesis;
-
private final int subscribedShardStateIndex;
private final KinesisDataFetcher<T> fetcherRef;
private final StreamShardHandle subscribedShard;
- private int maxNumberOfRecordsPerFetch;
- private final long fetchIntervalMillis;
- private final boolean useAdaptiveReads;
+ private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
- private final ShardMetricsReporter shardMetricsReporter;
+ private StartingPosition startingPosition;
private SequenceNumber lastSequenceNum;
- private Date initTimestamp;
+ private final RecordPublisher recordPublisher;
/**
* Creates a shard consumer.
*
* @param fetcherRef reference to the owning fetcher
+ * @param recordPublisher the record publisher used to read records
from kinesis
* @param subscribedShardStateIndex the state index of the shard this
consumer is subscribed to
* @param subscribedShard the shard this consumer is subscribed to
* @param lastSequenceNum the sequence number in the shard to start
consuming
- * @param kinesis the proxy instance to interact with Kinesis
- * @param shardMetricsReporter the reporter to report metrics to
+ * @param shardConsumerMetricsReporter the reporter to report metrics to
+ * @param shardDeserializer used to deserialize incoming records
*/
public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+ RecordPublisher recordPublisher,
Integer
subscribedShardStateIndex,
StreamShardHandle
subscribedShard,
SequenceNumber lastSequenceNum,
- KinesisProxyInterface kinesis,
- ShardMetricsReporter
shardMetricsReporter,
+ ShardConsumerMetricsReporter
shardConsumerMetricsReporter,
KinesisDeserializationSchema<T>
shardDeserializer) {
this.fetcherRef = checkNotNull(fetcherRef);
+ this.recordPublisher = checkNotNull(recordPublisher);
this.subscribedShardStateIndex =
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
+ this.shardConsumerMetricsReporter =
checkNotNull(shardConsumerMetricsReporter);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
- this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
checkArgument(
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
"Should not start a ShardConsumer if the shard has
already been completely read.");
this.deserializer = shardDeserializer;
Properties consumerConfig =
fetcherRef.getConsumerConfiguration();
- this.kinesis = kinesis;
- this.maxNumberOfRecordsPerFetch =
Integer.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
- this.fetchIntervalMillis =
Long.valueOf(consumerConfig.getProperty(
-
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
- this.useAdaptiveReads =
Boolean.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-
Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
- if
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
+
+ if
(lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+ Date initTimestamp;
String timestamp =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
try {
String format =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
SimpleDateFormat customDateFormat = new
SimpleDateFormat(format);
- this.initTimestamp =
customDateFormat.parse(timestamp);
+ initTimestamp =
customDateFormat.parse(timestamp);
} catch (IllegalArgumentException |
NullPointerException exception) {
throw new IllegalArgumentException(exception);
} catch (ParseException exception) {
- this.initTimestamp = new Date((long)
(Double.parseDouble(timestamp) * 1000));
+ initTimestamp = new Date((long)
(Double.parseDouble(timestamp) * 1000));
}
- } else {
- this.initTimestamp = null;
- }
- }
-
- /**
- * Returns a shard iterator for the given {@link SequenceNumber}.
- *
- * @return shard iterator
- * @throws Exception
- */
- protected String getShardIterator(SequenceNumber sequenceNumber) throws
Exception {
- if (isSentinelSequenceNumber(sequenceNumber)) {
- return getShardIteratorForSentinel(sequenceNumber);
+ startingPosition =
StartingPosition.fromTimestamp(initTimestamp);
} else {
- // we will be starting from an actual sequence number
(due to restore from failure).
- return
getShardIteratorForRealSequenceNumber(sequenceNumber);
+ startingPosition =
StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
}
}
- protected String getShardIteratorForSentinel(SequenceNumber
sentinelSequenceNumber) throws InterruptedException {
- String nextShardItr;
-
- if
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
{
- // if the shard is already closed, there will be no
latest next record to get for this shard
- if (subscribedShard.isClosed()) {
- nextShardItr = null;
- } else {
- nextShardItr =
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(),
null);
- }
- } else if
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
{
- nextShardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.TRIM_HORIZON.toString(), null);
- } else if
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
{
- nextShardItr = null;
- } else if
(sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
- nextShardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
- } else {
- throw new RuntimeException("Unknown sentinel type: " +
sentinelSequenceNumber);
- }
-
- return nextShardItr;
- }
-
- protected String getShardIteratorForRealSequenceNumber(SequenceNumber
sequenceNumber)
- throws Exception {
-
- // if the last sequence number refers to an aggregated record,
we need to clean up any dangling sub-records
- // from the last aggregated record; otherwise, we can simply
start iterating from the record right after.
-
- if (sequenceNumber.isAggregated()) {
- return
getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
- } else {
- // the last record was non-aggregated, so we can simply
start from the next record
- return kinesis.getShardIterator(
- subscribedShard,
-
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
- sequenceNumber.getSequenceNumber());
- }
- }
-
- protected String
getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
- throws Exception {
-
- String itrForLastAggregatedRecord =
- kinesis.getShardIterator(
- subscribedShard,
-
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-
sequenceNumber.getSequenceNumber());
-
- // get only the last aggregated record
- GetRecordsResult getRecordsResult =
getRecords(itrForLastAggregatedRecord, 1);
-
- List<UserRecord> fetchedRecords = deaggregateRecords(
- getRecordsResult.getRecords(),
-
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
- for (UserRecord record : fetchedRecords) {
- // we have found a dangling sub-record if it has a
larger subsequence number
- // than our last sequence number; if so, collect the
record and update state
- if (record.getSubSequenceNumber() > lastSubSequenceNum)
{
-
deserializeRecordForCollectionAndUpdateState(record);
- }
- }
-
- return getRecordsResult.getNextShardIterator();
- }
-
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
- String nextShardItr = getShardIterator(lastSequenceNum);
+ while (isRunning()) {
+ final RecordPublisherRunResult result =
recordPublisher.run(startingPosition, batch -> {
+ for (UserRecord userRecord :
batch.getDeaggregatedRecords()) {
+ if
(filterDeaggregatedRecord(userRecord)) {
+
deserializeRecordForCollectionAndUpdateState(userRecord);
+ }
+ }
- long processingStartTimeNanos = System.nanoTime();
+
shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+
shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+
shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+
ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
+ });
- while (isRunning()) {
- if (nextShardItr == null) {
+ if (result == COMPLETE) {
fetcherRef.updateState(subscribedShardStateIndex,
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread
once we've reached the end of the subscribed shard
break;
} else {
-
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
- GetRecordsResult getRecordsResult =
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
- List<Record> aggregatedRecords =
getRecordsResult.getRecords();
- int numberOfAggregatedRecords =
aggregatedRecords.size();
-
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
- // each of the Kinesis records may be
aggregated, so we must deaggregate them before proceeding
- List<UserRecord> fetchedRecords =
deaggregateRecords(
- aggregatedRecords,
-
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- long recordBatchSizeBytes = 0L;
- for (UserRecord record :
fetchedRecords) {
- recordBatchSizeBytes +=
record.getData().remaining();
-
deserializeRecordForCollectionAndUpdateState(record);
- }
-
- int numberOfDeaggregatedRecords =
fetchedRecords.size();
-
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
- nextShardItr =
getRecordsResult.getNextShardIterator();
-
- long adjustmentEndTimeNanos =
adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
- long runLoopTimeNanos =
adjustmentEndTimeNanos - processingStartTimeNanos;
- maxNumberOfRecordsPerFetch =
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(),
recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
-
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
- processingStartTimeNanos =
adjustmentEndTimeNanos; // for next time through the loop
+ startingPosition =
StartingPosition.continueFromSequenceNumber(lastSequenceNum);
Review comment:
See other comment regarding `StartingPosition`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]