jon-wei commented on a change in pull request #7351: Support kinesis
compatibility
URL: https://github.com/apache/incubator-druid/pull/7351#discussion_r273276480
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
##########
@@ -84,6 +96,99 @@ public KinesisIndexTaskIOConfig(
this.deaggregate = deaggregate;
}
+ public KinesisIndexTaskIOConfig(
+ int taskGroupId,
+ String baseSequenceName,
+ SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
+ SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
+ Boolean useTransaction,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ String endpoint,
+ Integer recordsPerFetch,
+ Integer fetchDelayMillis,
+ String awsAssumedRoleArn,
+ String awsExternalId,
+ boolean deaggregate
+ )
+ {
+ this(
+ taskGroupId,
+ baseSequenceName,
+ null,
+ null,
+ null,
+ startSequenceNumbers,
+ endSequenceNumbers,
+ useTransaction,
+ minimumMessageTime,
+ maximumMessageTime,
+ endpoint,
+ recordsPerFetch,
+ fetchDelayMillis,
+ awsAssumedRoleArn,
+ awsExternalId,
+ deaggregate
+ );
+ }
+
+ private static SeekableStreamStartSequenceNumbers<String, String>
getStartSequenceNumbers(
+ @Nullable SeekableStreamStartSequenceNumbers<String, String>
newStartSequenceNumbers,
+ @Nullable SeekableStreamEndSequenceNumbers<String, String>
oldStartSequenceNumbers,
+ @Nullable Set<String> exclusiveStartSequenceNumberPartitions
+ )
+ {
+ if (newStartSequenceNumbers == null) {
+ Preconditions.checkNotNull(
+ oldStartSequenceNumbers,
+ "Either startSequenceNumbers or startPartitions shoulnd't be null"
+ );
+
+ return new SeekableStreamStartSequenceNumbers<>(
+ oldStartSequenceNumbers.getStream(),
+ oldStartSequenceNumbers.getPartitionSequenceNumberMap(),
+ exclusiveStartSequenceNumberPartitions
+ );
+ } else {
+ return newStartSequenceNumbers;
+ }
+ }
+
+ /**
+ * This method is for compatibilty so that newer version of
KafkaIndexTaskIOConfig can be read by
+ * old version of Druid. Note that this method returns end sequence numbers
instead of start. This is because
+ * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
+ */
+ @JsonProperty
+ @Deprecated
+ public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+ {
+ // Converting to start sequence numbers. This is allowed for Kafka because
the start offset is always inclusive.
+ final SeekableStreamStartSequenceNumbers<String, String>
startSequenceNumbers = getStartSequenceNumbers();
+ return new SeekableStreamEndSequenceNumbers<>(
+ startSequenceNumbers.getStream(),
+ startSequenceNumbers.getPartitionSequenceNumberMap()
+ );
+ }
+
+ /**
+ * This method is for compatibilty so that newer version of
KafkaIndexTaskIOConfig can be read by
Review comment:
the comment should mention `KinesisIndexTaskIOConfig` as well
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]