dannycranmer commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r461532730
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -362,6 +440,10 @@ protected KinesisDataFetcher(List<String> streams,
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesis = kinesisProxyFactory.create(configProps);
+ this.kinesisProxyV2Factory =
checkNotNull(kinesisProxyV2Factory);
+ if (shouldRegisterConsumerEagerly()) {
Review comment:
@xiaolong-sn `EAGER` should be done in the `FlinkKinesisConsumer`
constructor, `LAZY` should be done here
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties
configProps) {
SequenceNumber lastSequenceNum,
MetricGroup metricGroup,
KinesisDeserializationSchema<T> shardDeserializer) {
+ return createShardConsumer(
+ subscribedShardStateIndex,
+ subscribedShard,
+ lastSequenceNum,
+ metricGroup,
+ shardDeserializer,
+ null
+ );
+ }
+
+ /**
+ * Create a new shard consumer.
+ * Override this method to customize shard consumer behavior in
subclasses.
+ * @param subscribedShardStateIndex the state index of the shard this
consumer is subscribed to
+ * @param subscribedShard the shard this consumer is subscribed to
+ * @param lastSequenceNum the sequence number in the shard to start
consuming
+ * @param metricGroup the metric group to report metrics to
+ * @param streamInfoList the stream info used for enhanced fan-out to
consume from
+ * @return shard consumer
+ */
+ protected ShardConsumer<T> createShardConsumer(
+ Integer subscribedShardStateIndex,
+ StreamShardHandle subscribedShard,
+ SequenceNumber lastSequenceNum,
+ MetricGroup metricGroup,
+ KinesisDeserializationSchema<T> shardDeserializer,
+ @Nullable List<FanOutStreamInfo> streamInfoList) {
Review comment:
@xiaolong-sn why do we need to pass all the stream info? A
`ShardConsumer` is only concerned about a single stream
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##########
@@ -134,6 +134,46 @@
*/
private final int listStreamConsumersMaxRetries;
+ /**
+ * Max retries for the describe stream operation.
+ */
+ private final int describeStreamMaxRetries;
Review comment:
@xiaolong-sn it would make sense to move the `DescribeStream` bits to
you config PR since that has not been reviewed yet
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+ /** Kinesis stream name. */
+ private final String stream;
+
+ /** Kinesis stream arn. */
+ private final String streamArn;
+
+ /** Registered consumer name for the related stream. */
+ private final String consumerName;
+
+ /** Registered consumer arn for the related stream. */
+ private final String consumerArn;
+
+ /**
+ * Return the Kinesis stream name.
+ */
+ public String getStream() {
+ return stream;
+ }
+
+ /**
+ * Return the Kinesis stream arn.
+ */
+ public String getStreamArn() {
+ return streamArn;
+ }
+
+ /**
+ * Return the Kinesis consumer name for an enhanced fan-out consumer.
+ */
+ public String getConsumerName() {
+ return consumerName;
+ }
+
+ /**
+ * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+ */
+ public String getConsumerArn() {
+ return consumerArn;
+ }
+
+ /**
+ * Public constructor for fan out stream info.
+ */
+ public FanOutStreamInfo(String stream, String streamArn, String
consumerName, String consumerArn) {
Review comment:
@xiaolong-sn nit: constructors should go above methods
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
Review comment:
@xiaolong-sn I had changed this class to not construct it dependencies
inline with Dependency Inversion (SOLID) principle
(https://github.com/apache/flink/pull/13005/commits/b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a)
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
+ Map<String, String> result = new HashMap<>();
+ for (String stream : streams) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest
+ .builder()
+ .streamName(stream)
+ .build();
+ DescribeStreamResponse describeStreamResponse = null;
+
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() &&
describeStreamResponse == null) {
+ try {
+ describeStreamResponse =
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+ } catch (ExecutionException ex) {
+ if
(AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis =
fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to describe stream " + stream + ". Backing
off for "
+ + backoffMillis + "
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+ if (describeStreamResponse == null) {
+ throw new RuntimeException("Retries exceeded
for describeStream operation - all " +
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ result.put(stream,
describeStreamResponse.streamDescription().streamARN());
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FanOutStreamInfo> registerStreamConsumer(Map<String,
String> streamArns) throws InterruptedException, ExecutionException {
+
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+ String consumerName =
fanOutRecordPublisherConfiguration.getConsumerName().get();
+ List<FanOutStreamInfo> result = new ArrayList<>();
+ for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+ String stream = entry.getKey();
+ String streamArn = entry.getValue();
+ RegisterStreamConsumerRequest
registerStreamConsumerRequest = RegisterStreamConsumerRequest
+ .builder()
+ .consumerName(consumerName)
+ .streamARN(streamArn)
+ .build();
+ FanOutStreamInfo fanOutStreamInfo = null;
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() &&
fanOutStreamInfo == null) {
+ try {
+ RegisterStreamConsumerResponse
registerStreamConsumerResponse =
kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+ fanOutStreamInfo = new
FanOutStreamInfo(stream, streamArn, consumerName,
registerStreamConsumerResponse.consumer().consumerARN());
+ } catch (ExecutionException ex) {
+ if (AwsV2Util.isResourceInUse(ex)) {
+ fanOutStreamInfo =
describeStreamConsumer(stream, streamArn, consumerName);
+ } else if
(AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis =
fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to register " + stream + ". Backing off for "
+ + backoffMillis + "
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ if (fanOutStreamInfo == null) {
+ throw new RuntimeException("Retries exceeded
for registerStream operation - all " +
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ result.add(fanOutStreamInfo);
+ }
+ return result;
+ }
+
+ public FanOutStreamInfo describeStreamConsumer(String stream, String
streamArn, String consumerName) throws InterruptedException, ExecutionException
{
+ DescribeStreamConsumerRequest describeStreamConsumerRequest =
DescribeStreamConsumerRequest
+ .builder()
+ .streamARN(streamArn)
+ .consumerName(consumerName)
+ .build();
+ FanOutStreamInfo fanOutStreamInfo = null;
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() &&
fanOutStreamInfo == null) {
+ try {
+ DescribeStreamConsumerResponse
describeStreamConsumerResponse =
kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+ fanOutStreamInfo = new FanOutStreamInfo(stream,
streamArn, consumerName,
describeStreamConsumerResponse.consumerDescription().consumerARN());
+ } catch (ExecutionException ex) {
+ if (AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis = fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to describe stream consumer " + stream + ".
Backing off for "
+ + backoffMillis + " millis (" +
ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ if (fanOutStreamInfo == null) {
+ throw new RuntimeException("Retries exceeded for
registerStream operation - all " +
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ return fanOutStreamInfo;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deregisterStreamConsumer(List<FanOutStreamInfo>
fanOutStreamInfos) throws InterruptedException, ExecutionException {
Review comment:
@xiaolong-sn again please make this singular (single deregistration at a
time)
Also this needs to open with describe and stagger too as per the FLIP flow
chart to deal with competing requests
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties
configProps) {
SequenceNumber lastSequenceNum,
MetricGroup metricGroup,
KinesisDeserializationSchema<T> shardDeserializer) {
+ return createShardConsumer(
+ subscribedShardStateIndex,
+ subscribedShard,
+ lastSequenceNum,
+ metricGroup,
+ shardDeserializer,
+ null
+ );
+ }
+
+ /**
+ * Create a new shard consumer.
+ * Override this method to customize shard consumer behavior in
subclasses.
+ * @param subscribedShardStateIndex the state index of the shard this
consumer is subscribed to
+ * @param subscribedShard the shard this consumer is subscribed to
+ * @param lastSequenceNum the sequence number in the shard to start
consuming
+ * @param metricGroup the metric group to report metrics to
+ * @param streamInfoList the stream info used for enhanced fan-out to
consume from
+ * @return shard consumer
+ */
+ protected ShardConsumer<T> createShardConsumer(
+ Integer subscribedShardStateIndex,
+ StreamShardHandle subscribedShard,
+ SequenceNumber lastSequenceNum,
+ MetricGroup metricGroup,
+ KinesisDeserializationSchema<T> shardDeserializer,
+ @Nullable List<FanOutStreamInfo> streamInfoList) {
final KinesisProxyInterface kinesis =
kinesisProxyFactory.create(configProps);
final RecordPublisher recordPublisher = new
PollingRecordPublisherFactory()
.create(configProps, metricGroup, subscribedShard,
kinesis);
- return new ShardConsumer<>(
+ return new ShardConsumer<T>(
this,
recordPublisher,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
new ShardConsumerMetricsReporter(metricGroup),
- shardDeserializer);
+ shardDeserializer,
+ configProps,
Review comment:
@xiaolong-sn all these additional properties should not be required once
you move LAZY. Remember we have a single stream consumer per stream, not shard.
So we should perform lazy in the `KinesisDataFetcher` (once per task manager)
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
+ Map<String, String> result = new HashMap<>();
+ for (String stream : streams) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest
+ .builder()
+ .streamName(stream)
+ .build();
+ DescribeStreamResponse describeStreamResponse = null;
+
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() &&
describeStreamResponse == null) {
+ try {
+ describeStreamResponse =
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+ } catch (ExecutionException ex) {
+ if
(AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis =
fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to describe stream " + stream + ". Backing
off for "
+ + backoffMillis + "
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+ if (describeStreamResponse == null) {
+ throw new RuntimeException("Retries exceeded
for describeStream operation - all " +
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ result.put(stream,
describeStreamResponse.streamDescription().streamARN());
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FanOutStreamInfo> registerStreamConsumer(Map<String,
String> streamArns) throws InterruptedException, ExecutionException {
Review comment:
@xiaolong-sn again I think this should accept a single
streamArn/consumerName and loop externally
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+ /** Kinesis stream name. */
+ private final String stream;
+
+ /** Kinesis stream arn. */
+ private final String streamArn;
+
+ /** Registered consumer name for the related stream. */
+ private final String consumerName;
+
+ /** Registered consumer arn for the related stream. */
+ private final String consumerArn;
+
+ /**
+ * Return the Kinesis stream name.
+ */
+ public String getStream() {
+ return stream;
+ }
+
+ /**
+ * Return the Kinesis stream arn.
+ */
+ public String getStreamArn() {
+ return streamArn;
+ }
+
+ /**
+ * Return the Kinesis consumer name for an enhanced fan-out consumer.
+ */
+ public String getConsumerName() {
+ return consumerName;
+ }
+
+ /**
+ * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+ */
+ public String getConsumerArn() {
+ return consumerArn;
+ }
+
+ /**
+ * Public constructor for fan out stream info.
+ */
+ public FanOutStreamInfo(String stream, String streamArn, String
consumerName, String consumerArn) {
+ this.stream = stream;
+ this.streamArn = streamArn;
+ this.consumerName = consumerName;
+ this.consumerArn = consumerArn;
+ }
+
+ @Override
+ public boolean equals(Object o) {
Review comment:
@xiaolong-sn do we need equals/hashCode for this object?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -147,9 +228,31 @@ public void run() {
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
+ } finally {
+ if (this.streamInfoList != null) {
+ try {
+ deregisterStreamConsumer();
+ } catch (Throwable t) {
+ fetcherRef.stopWithError(t);
+ }
+ }
}
}
+ private void deregisterStreamConsumer() throws ExecutionException,
InterruptedException {
Review comment:
@xiaolong-sn this needs to move to `KinesisDataFetcher::shutdownFetcher`
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -22,21 +22,29 @@
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
Review comment:
@xiaolong-sn I was not expecting any changes to `ShardConsumer`, once
you move `LAZY` to `KinesisDataFetcher` this should be unchanged
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+ /** Kinesis stream name. */
+ private final String stream;
+
+ /** Kinesis stream arn. */
+ private final String streamArn;
+
+ /** Registered consumer name for the related stream. */
+ private final String consumerName;
Review comment:
@xiaolong-sn Immutable, nice +1
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -248,4 +254,29 @@ static AwsCredentialsProvider
getWebIdentityTokenFileCredentialsProvider(
public static Region getRegion(final Properties configProps) {
return
Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
}
+
+ /**
+ * Whether or not an exception is recoverable.
+ */
+ public static boolean isRecoverableException(ExecutionException e) {
+ if (!(e.getCause() instanceof SdkException)) {
+ return false;
+ }
+ SdkException ase = (SdkException) e.getCause();
+ return ase instanceof LimitExceededException || ase instanceof
ProvisionedThroughputExceededException || ase instanceof ResourceInUseException;
Review comment:
@xiaolong-sn `ase instanceof ResourceInUseException` can be replaced
with `isResourceInUse(ase)`
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
Review comment:
@xiaolong-sn I feel like we are throwing a lot of potentially useful
information away here, why not return `DescribeStreamResponse` instead and keep
it generic? The KinesisProxyV1 returns SDK objects
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when
de-/registering streams.
+ */
+public class FanOutStreamInfo {
Review comment:
@xiaolong-sn This should be renamed to `FanOutStreamConsumerInfo` please
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+ /** Kinesis stream name. */
+ private final String stream;
+
+ /** Kinesis stream arn. */
+ private final String streamArn;
+
+ /** Registered consumer name for the related stream. */
+ private final String consumerName;
+
+ /** Registered consumer arn for the related stream. */
+ private final String consumerArn;
+
+ /**
+ * Return the Kinesis stream name.
+ */
+ public String getStream() {
+ return stream;
+ }
+
+ /**
+ * Return the Kinesis stream arn.
+ */
+ public String getStreamArn() {
+ return streamArn;
+ }
+
+ /**
+ * Return the Kinesis consumer name for an enhanced fan-out consumer.
+ */
+ public String getConsumerName() {
+ return consumerName;
+ }
+
+ /**
+ * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+ */
+ public String getConsumerArn() {
+ return consumerArn;
+ }
+
+ /**
+ * Public constructor for fan out stream info.
Review comment:
@xiaolong-sn no javadoc for parameters
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
+ Map<String, String> result = new HashMap<>();
+ for (String stream : streams) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest
+ .builder()
+ .streamName(stream)
+ .build();
+ DescribeStreamResponse describeStreamResponse = null;
+
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() &&
describeStreamResponse == null) {
+ try {
+ describeStreamResponse =
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+ } catch (ExecutionException ex) {
+ if
(AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis =
fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to describe stream " + stream + ". Backing
off for "
+ + backoffMillis + "
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+ if (describeStreamResponse == null) {
+ throw new RuntimeException("Retries exceeded
for describeStream operation - all " +
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ result.put(stream,
describeStreamResponse.streamDescription().streamARN());
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FanOutStreamInfo> registerStreamConsumer(Map<String,
String> streamArns) throws InterruptedException, ExecutionException {
+
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+ String consumerName =
fanOutRecordPublisherConfiguration.getConsumerName().get();
+ List<FanOutStreamInfo> result = new ArrayList<>();
+ for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+ String stream = entry.getKey();
+ String streamArn = entry.getValue();
+ RegisterStreamConsumerRequest
registerStreamConsumerRequest = RegisterStreamConsumerRequest
Review comment:
@xiaolong-sn what happens if the stream consumer is already registered?
The flow chart in the FLIP had a call to `ListStreamConsumers` to establish if
it is already registered. Also in the case of Lazy we are meant to be
introducing a jitter delay up front. Please refer to the flowcharts in the
[FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
Review comment:
@xiaolong-sn this method is called `describeStream` yet it is describing
streams, please rename or split to two methods:
- `describeStream`
- `describeStreams`
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
+ Map<String, String> result = new HashMap<>();
+ for (String stream : streams) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest
+ .builder()
+ .streamName(stream)
+ .build();
+ DescribeStreamResponse describeStreamResponse = null;
+
+ int retryCount = 0;
+ while (retryCount <=
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() &&
describeStreamResponse == null) {
+ try {
+ describeStreamResponse =
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+ } catch (ExecutionException ex) {
+ if
(AwsV2Util.isRecoverableException(ex)) {
+ long backoffMillis =
fullJitterBackoff(
+
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(),
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(),
retryCount++);
+ LOG.warn("Got recoverable
AmazonServiceException when trying to describe stream " + stream + ". Backing
off for "
+ + backoffMillis + "
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
+ }
+ }
+ if (describeStreamResponse == null) {
+ throw new RuntimeException("Retries exceeded
for describeStream operation - all " +
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+ " retry attempts failed.");
+ }
+ result.put(stream,
describeStreamResponse.streamDescription().streamARN());
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<FanOutStreamInfo> registerStreamConsumer(Map<String,
String> streamArns) throws InterruptedException, ExecutionException {
+
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+ String consumerName =
fanOutRecordPublisherConfiguration.getConsumerName().get();
+ List<FanOutStreamInfo> result = new ArrayList<>();
+ for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+ String stream = entry.getKey();
+ String streamArn = entry.getValue();
+ RegisterStreamConsumerRequest
registerStreamConsumerRequest = RegisterStreamConsumerRequest
Review comment:
Ah I see you are using the `describeStreamConsumer` instead of list.
That is better actually. But I think you should do that first, as per the flow
chart. Just replace List with Describe. If the consumer is already registered
it will be 1 call instead of 2 per task.
Describe has a higher TPS too so that is good!
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
Review comment:
@xiaolong-sn considering we cannot batch call I would opt to just expose
a single `describeStream` method and loop externally to remove the Map and
additional complexities around that:
- `DescribeStreamResponse describeStream(final String streamName)`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]