jihoonson opened a new pull request #7351: Support kinesis compatibility URL: https://github.com/apache/incubator-druid/pull/7351 First of all, this PR isn't strictly necessary for Apache Druid because Kinesis indexing service was introduced in 0.14.0. Nonetheless, I would like to ask opinions of other people about merging this PR to the master. The kinesis indexing service was a proprietary extension of Imply, but we ended up contributing it to Apache Druid because we think it would help to expand use cases of Druid. So, it's new for Apache Druid, but it's not for Imply users and there is a compatibility issue mainly because `KinesisIndexTaskIOConfig` was changed in https://github.com/apache/incubator-druid/pull/7291. So, if this PR is not merged into master, we should maintain this code change manually in our version. I would like to avoid this manual maintenance and probably this PR can be merged because it just adds some methods which are used for JSON serde. I would like to ask committers outside of Imply to review this PR. FYI, the old version of `KinesisIndexTaskIOConfig` is below. ```java public class KinesisIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_PAUSE_AFTER_READ = true; private static final int DEFAULT_RECORDS_PER_FETCH = 4000; private static final int DEFAULT_FETCH_DELAY_MILLIS = 0; private final String baseSequenceName; private final KinesisPartitions startPartitions; private final KinesisPartitions endPartitions; private final boolean useTransaction; private final boolean pauseAfterRead; private final Optional<DateTime> minimumMessageTime; private final Optional<DateTime> maximumMessageTime; private final String endpoint; private final Integer recordsPerFetch; private final Integer fetchDelayMillis; private final String awsAccessKeyId; private final String awsSecretAccessKey; private final Set<String> exclusiveStartSequenceNumberPartitions; private final String awsAssumedRoleArn; private final String awsExternalId; private final boolean deaggregate; @JsonCreator public KinesisIOConfig( @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KinesisPartitions startPartitions, @JsonProperty("endPartitions") KinesisPartitions endPartitions, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("endpoint") String endpoint, @JsonProperty("recordsPerFetch") Integer recordsPerFetch, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAccessKeyId") String awsAccessKeyId, @JsonProperty("awsSecretAccessKey") String awsSecretAccessKey, @JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, @JsonProperty("deaggregate") boolean deaggregate ) { this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint"); this.recordsPerFetch = recordsPerFetch != null ? recordsPerFetch : DEFAULT_RECORDS_PER_FETCH; this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : DEFAULT_FETCH_DELAY_MILLIS; this.awsAccessKeyId = awsAccessKeyId; this.awsSecretAccessKey = awsSecretAccessKey; this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions; this.awsAssumedRoleArn = awsAssumedRoleArn; this.awsExternalId = awsExternalId; this.deaggregate = deaggregate; Preconditions.checkArgument( startPartitions.getStream().equals(endPartitions.getStream()), "start stream and end stream must match" ); Preconditions.checkArgument( startPartitions.getPartitionSequenceNumberMap() .keySet() .equals(endPartitions.getPartitionSequenceNumberMap().keySet()), "start partition set and end partition set must match" ); } @JsonProperty public String getBaseSequenceName() { return baseSequenceName; } @JsonProperty public KinesisPartitions getStartPartitions() { return startPartitions; } @JsonProperty public KinesisPartitions getEndPartitions() { return endPartitions; } @JsonProperty public boolean isUseTransaction() { return useTransaction; } @JsonProperty public boolean isPauseAfterRead() { return pauseAfterRead; } @JsonProperty public Optional<DateTime> getMinimumMessageTime() { return minimumMessageTime; } @JsonProperty public Optional<DateTime> getMaximumMessageTime() { return maximumMessageTime; } @JsonProperty public String getEndpoint() { return endpoint; } @JsonProperty public int getRecordsPerFetch() { return recordsPerFetch; } @JsonProperty public int getFetchDelayMillis() { return fetchDelayMillis; } @JsonProperty public String getAwsAccessKeyId() { return awsAccessKeyId; } @JsonProperty public String getAwsSecretAccessKey() { return awsSecretAccessKey; } @JsonProperty public Set<String> getExclusiveStartSequenceNumberPartitions() { return exclusiveStartSequenceNumberPartitions; } @JsonProperty public String getAwsAssumedRoleArn() { return awsAssumedRoleArn; } @JsonProperty public String getAwsExternalId() { return awsExternalId; } @JsonProperty public boolean isDeaggregate() { return deaggregate; } @Override public String toString() { return "KinesisIOConfig{" + "baseSequenceName='" + baseSequenceName + '\'' + ", startPartitions=" + startPartitions + ", endPartitions=" + endPartitions + ", useTransaction=" + useTransaction + ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + ", maximumMessageTime=" + maximumMessageTime + ", endpoint='" + endpoint + '\'' + ", recordsPerFetch=" + recordsPerFetch + ", fetchDelayMillis=" + fetchDelayMillis + ", awsAccessKeyId='" + awsAccessKeyId + '\'' + ", awsSecretAccessKey=" + "************************" + ", exclusiveStartSequenceNumberPartitions=" + exclusiveStartSequenceNumberPartitions + ", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' + ", awsExternalId='" + awsExternalId + '\'' + ", deaggregate=" + deaggregate + '}'; } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
