This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 4c2fdd9 [FLINK-33260][Connectors/Kinesis] Allow user to provide a
list of recoverable exceptions (#110)
4c2fdd9 is described below
commit 4c2fdd94e27312282e11b17e12f4ec3e5adc113e
Author: Emre Kartoglu <[email protected]>
AuthorDate: Tue Nov 7 22:53:17 2023 +0000
[FLINK-33260][Connectors/Kinesis] Allow user to provide a list of
recoverable exceptions (#110)
---
docs/content.zh/docs/connectors/table/kinesis.md | 8 ++
docs/content/docs/connectors/table/kinesis.md | 8 ++
.../kinesis/config/ConsumerConfigConstants.java | 7 ++
.../connectors/kinesis/config/ExceptionConfig.java | 35 +++++++
.../kinesis/config/RecoverableErrorsConfig.java | 99 ++++++++++++++++++++
.../publisher/fanout/FanOutRecordPublisher.java | 3 +-
.../fanout/FanOutRecordPublisherConfiguration.java | 14 +++
.../publisher/fanout/FanOutShardSubscriber.java | 75 +++++++++++----
.../connectors/kinesis/util/KinesisConfigUtil.java | 7 ++
.../config/RecoverableErrorsConfigTest.java | 83 ++++++++++++++++
.../fanout/FanOutShardSubscriberTest.java | 104 +++++++++++++++++++--
.../kinesis/util/KinesisConfigUtilTest.java | 24 +++++
12 files changed, 439 insertions(+), 28 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md
b/docs/content.zh/docs/connectors/table/kinesis.md
index b9010ff..43fa007 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -680,6 +680,14 @@ Connector Options
<td>Long</td>
<td>The interval (in milliseconds) after which to consider a shard idle
for purposes of watermark generation. A positive value will allow the watermark
to progress even when some shards don't receive new records.</td>
</tr>
+ <tr>
+ <td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>User-specified Exception to retry indefinitely. Example value:
`java.net.UnknownHostException`. This configuration is a zero-based array. As
such, the specified exceptions must start with index 0. Specified exceptions
must be valid Throwables in classpath, or connector will fail to initialize and
fail fast.</td>
+ </tr>
<tr>
<td><h5>scan.watermark.sync.interval</h5></td>
<td>optional</td>
diff --git a/docs/content/docs/connectors/table/kinesis.md
b/docs/content/docs/connectors/table/kinesis.md
index 3ad681a..46cf1c7 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -681,6 +681,14 @@ Connector Options
<td>Long</td>
<td>The interval (in milliseconds) after which to consider a shard idle
for purposes of watermark generation. A positive value will allow the watermark
to progress even when some shards don't receive new records.</td>
</tr>
+ <tr>
+ <td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>User-specified Exception to retry indefinitely. Example value:
`java.net.UnknownHostException`. This configuration is a zero-based array. As
such, the specified exceptions must start with index 0. Specified exceptions
must be valid Throwables in classpath, or connector will fail to initialize and
fail fast.</td>
+ </tr>
<tr>
<td><h5>scan.watermark.sync.interval</h5></td>
<td>optional</td>
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index fff44d6..7106822 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -191,6 +191,13 @@ public class ConsumerConfigConstants extends
AWSConfigConstants {
public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
"flink.stream.registerstreamconsumer.backoff.expconst";
+ /**
+ * The user-provided list of exceptions to recover from. These exceptions
are retried
+ * indefinitely.
+ */
+ public static final String RECOVERABLE_EXCEPTIONS_PREFIX =
+ "flink.shard.consumer.error.recoverable";
+
/** The maximum number of deregisterStream attempts if we get a
recoverable exception. */
public static final String DEREGISTER_STREAM_RETRIES =
"flink.stream.deregisterstreamconsumer.maxretries";
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
new file mode 100644
index 0000000..d3b4080
--- /dev/null
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ * Helper class to hold information/behaviour about Exceptions. Used for
configuring recoverable
+ * exceptions.
+ */
+public class ExceptionConfig {
+ private final Class<?> exceptionClass;
+
+ public ExceptionConfig(Class<?> exClass) {
+ this.exceptionClass = exClass;
+ }
+
+ public Class<?> getExceptionClass() {
+ return exceptionClass;
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
new file mode 100644
index 0000000..8c5c4df
--- /dev/null
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Hosts the recoverable exception configuration. Recoverable exceptions are
retried indefinitely.
+ */
+public class RecoverableErrorsConfig {
+ public static final String INVALID_CONFIG_MESSAGE =
+ "Invalid config for recoverable consumer exceptions. "
+ + "Valid config example: "
+ +
"`flink.shard.consumer.error.recoverable[0].exception=net.java.UnknownHostException`.
"
+ + "Your config array must use zero-based indexing as shown
in the example.";
+
+ /**
+ * Parses the array of recoverable error configs.
+ *
+ * @param config connector configuration
+ * @return an Optional of RecoverableErrorsConfig
+ */
+ public static Optional<RecoverableErrorsConfig>
createConfigFromPropertiesOrThrow(
+ final Properties config) {
+ List<ExceptionConfig> exConfs = new ArrayList<>();
+ int idx = 0;
+ String exceptionConfigKey =
+ String.format(
+ "%s[%d].exception",
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX,
idx);
+ while (config.containsKey(exceptionConfigKey)) {
+ String exPath = config.getProperty(exceptionConfigKey);
+ try {
+ Class<?> aClass = Class.forName(exPath);
+ if (!Throwable.class.isAssignableFrom(aClass)) {
+ throw new ClassCastException();
+ }
+ exConfs.add(new ExceptionConfig(aClass));
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ "Provided recoverable exception class is not a
Throwable: " + exPath);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ "Provided recoverable exception class could not be
found: " + exPath);
+ }
+ exceptionConfigKey =
+ String.format(
+ "%s[%d].exception",
+
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
+ }
+ if (idx > 0) {
+ // We processed configs successfully
+ return Optional.of(new RecoverableErrorsConfig(exConfs));
+ }
+
+ // Check if user provided wrong config suffix, so they fail faster
+ for (Object key : config.keySet()) {
+ if (((String)
key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
+ throw new
IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private final List<ExceptionConfig> exceptionConfigs;
+
+ public RecoverableErrorsConfig(List<ExceptionConfig> exceptionConfigs) {
+ this.exceptionConfigs = exceptionConfigs;
+ }
+
+ public boolean hasNoConfig() {
+ return CollectionUtils.isEmpty(exceptionConfigs);
+ }
+
+ public List<ExceptionConfig> getExceptionConfigs() {
+ return exceptionConfigs;
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index e411512..ee0623e 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -168,7 +168,8 @@ public class FanOutRecordPublisher implements
RecordPublisher {
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout(),
- runningSupplier);
+ runningSupplier,
+ configuration.getRecoverableErrorsConfig());
RecordPublisherRunResult result;
try {
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index cd46876..cbc5103 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -20,6 +20,7 @@ package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.util.Preconditions;
@@ -118,6 +119,9 @@ public class FanOutRecordPublisherConfiguration {
/** Exponential backoff power constant for the describe stream consumer
operation. */
private final double describeStreamConsumerExpConstant;
+ /** Recoverable error configuration. These are retried indefinitely. */
+ private final RecoverableErrorsConfig recoverableErrorsConfig;
+
/**
* Creates a FanOutRecordPublisherConfiguration.
*
@@ -318,6 +322,8 @@ public class FanOutRecordPublisherConfiguration {
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
+
+ this.recoverableErrorsConfig =
this.parseRecoverableErrorConfig(configProps);
}
// ------------------------------------------------------------------------
@@ -472,4 +478,12 @@ public class FanOutRecordPublisherConfiguration {
public Optional<String> getStreamConsumerArn(String stream) {
return Optional.ofNullable(streamConsumerArns.get(stream));
}
+
+ public RecoverableErrorsConfig parseRecoverableErrorConfig(final
Properties config) {
+ return
RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config).orElse(null);
+ }
+
+ public RecoverableErrorsConfig getRecoverableErrorsConfig() {
+ return recoverableErrorsConfig;
+ }
}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index a9c7c09..21fe050 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -19,9 +19,12 @@ package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import io.netty.handler.timeout.ReadTimeoutException;
@@ -38,6 +41,7 @@ import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import java.time.Duration;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
@@ -121,6 +125,8 @@ public class FanOutShardSubscriber {
private final Supplier<Boolean> runningSupplier;
+ private final RecoverableErrorsConfig recoverableErrorsConfig;
+
/**
* Create a new Fan Out Shard subscriber.
*
@@ -130,20 +136,24 @@ public class FanOutShardSubscriber {
* @param subscribeToShardTimeout A timeout when waiting for a shard
subscription to be
* established
* @param runningSupplier a callback to query if the consumer is still
running
+ * @param recoverableErrorsConfig recoverable error configuration (errors
that are retried
+ * indefinitely)
*/
FanOutShardSubscriber(
final String consumerArn,
final String shardId,
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout,
- final Supplier<Boolean> runningSupplier) {
+ final Supplier<Boolean> runningSupplier,
+ final RecoverableErrorsConfig recoverableErrorsConfig) {
this(
consumerArn,
shardId,
kinesis,
subscribeToShardTimeout,
DEFAULT_QUEUE_TIMEOUT,
- runningSupplier);
+ runningSupplier,
+ recoverableErrorsConfig);
}
/**
@@ -164,13 +174,15 @@ public class FanOutShardSubscriber {
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout,
final Duration queueWaitTimeout,
- final Supplier<Boolean> runningSupplier) {
+ final Supplier<Boolean> runningSupplier,
+ final RecoverableErrorsConfig recoverableErrorsConfig) {
this.kinesis = Preconditions.checkNotNull(kinesis);
this.consumerArn = Preconditions.checkNotNull(consumerArn);
this.shardId = Preconditions.checkNotNull(shardId);
this.subscribeToShardTimeout = subscribeToShardTimeout;
this.queueWaitTimeout = queueWaitTimeout;
this.runningSupplier = runningSupplier;
+ this.recoverableErrorsConfig = recoverableErrorsConfig;
}
/**
@@ -249,22 +261,21 @@ public class FanOutShardSubscriber {
kinesis.subscribeToShard(request, responseHandler);
- boolean subscriptionEstablished =
- waitForSubscriptionLatch.await(
+ boolean subscriptionTimedOut =
+ !waitForSubscriptionLatch.await(
subscribeToShardTimeout.toMillis(),
TimeUnit.MILLISECONDS);
- if (!subscriptionEstablished) {
+ if (subscriptionTimedOut) {
final String errorMessage =
"Timed out acquiring subscription - " + shardId + " (" +
consumerArn + ")";
LOG.error(errorMessage);
subscription.cancelSubscription();
- handleError(
- new RecoverableFanOutSubscriberException(new
TimeoutException(errorMessage)));
+ handleErrorAndRethrow(new TimeoutException(errorMessage));
}
Throwable throwable = exception.get();
if (throwable != null) {
- handleError(throwable);
+ handleErrorAndRethrow(throwable);
}
LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
@@ -282,7 +293,7 @@ public class FanOutShardSubscriber {
*
* @param throwable the exception that has occurred
*/
- private void handleError(final Throwable throwable) throws
FanOutSubscriberException {
+ private void handleErrorAndRethrow(final Throwable throwable) throws
FanOutSubscriberException {
Throwable cause;
if (throwable instanceof CompletionException || throwable instanceof
ExecutionException) {
cause = throwable.getCause();
@@ -302,16 +313,10 @@ public class FanOutShardSubscriber {
throw new FanOutSubscriberInterruptedException(throwable);
} else if (cause instanceof FanOutSubscriberException) {
throw (FanOutSubscriberException) cause;
- } else if (cause instanceof ReadTimeoutException) {
- // ReadTimeoutException occurs naturally under backpressure
scenarios when full batches
- // take longer to
- // process than standard read timeout (default 30s). Recoverable
exceptions are intended
- // to be retried
- // indefinitely to avoid system degradation under backpressure.
The EFO connection
- // (subscription) to Kinesis
- // is closed, and reacquired once the queue of records has been
processed.
+ } else if (isDefinedAsRecoverable(cause)) {
throw new RecoverableFanOutSubscriberException(cause);
} else {
+ // All other errors are treated as retryable
throw new RetryableFanOutSubscriberException(cause);
}
}
@@ -329,6 +334,38 @@ public class FanOutShardSubscriber {
return false;
}
+ private boolean isDefinedAsRecoverable(Throwable cause) {
+ // non-customisable list of exceptions that should be recovered
(retried indefinitely).
+ if (cause instanceof ReadTimeoutException || cause instanceof
TimeoutException) {
+ // ReadTimeoutException occurs naturally under backpressure
scenarios when full batches
+ // take longer to
+ // process than standard read timeout (default 30s). Recoverable
exceptions are intended
+ // to be retried
+ // indefinitely to avoid system degradation under backpressure.
The EFO connection
+ // (subscription) to Kinesis
+ // is closed, and reacquired once the queue of records has been
processed.
+ return true;
+ }
+ return isConfiguredAsRecoverable(cause);
+ }
+
+ /**
+ * @param cause Throwable on which to base our exception search
+ * @return true if the input Throwable is configured as a Recoverable
Error by the user
+ */
+ private boolean isConfiguredAsRecoverable(Throwable cause) {
+ if (this.recoverableErrorsConfig == null ||
this.recoverableErrorsConfig.hasNoConfig()) {
+ return false;
+ }
+ for (ExceptionConfig config :
this.recoverableErrorsConfig.getExceptionConfigs()) {
+ Optional<Throwable> throwable =
+ ExceptionUtils.findThrowable(
+ cause, (Class<Throwable>)
config.getExceptionClass());
+ return throwable.isPresent();
+ }
+ return false;
+ }
+
/**
* Once the subscription is open, records will be delivered to the {@link
BlockingQueue}. Queue
* capacity is hardcoded to 1 record, the queue is used solely to separate
consumption and
@@ -386,7 +423,7 @@ public class FanOutShardSubscriber {
// The subscription is complete, but the shard might not be,
so we return incomplete
return INCOMPLETE;
} else {
- handleError(subscriptionEvent.getThrowable());
+ handleErrorAndRethrow(subscriptionEvent.getThrowable());
result = INCOMPLETE;
break;
}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 5cf8579..1dcd4e0 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -29,6 +29,7 @@ import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
import
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@@ -322,6 +323,12 @@ public class KinesisConfigUtil {
config,
ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
"Invalid value given for EFO HTTP client max concurrency. Must
be positive.");
+
+ validateRecoverableErrorConfig(config);
+ }
+
+ private static void validateRecoverableErrorConfig(Properties config) {
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
}
/**
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
new file mode 100644
index 0000000..c1cb181
--- /dev/null
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RecoverableErrorsConfig}. */
+public class RecoverableErrorsConfigTest {
+
+ @Test
+ public void testParseConfigFromProperties() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception",
+ "java.net.UnknownHostException");
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[1].exception",
+ "java.lang.IllegalArgumentException");
+ Optional<RecoverableErrorsConfig> recoverableErrorsConfigOptional =
+
RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ assertTrue(recoverableErrorsConfigOptional.isPresent());
+ RecoverableErrorsConfig recoverableErrorsConfig =
recoverableErrorsConfigOptional.get();
+ assertFalse(recoverableErrorsConfig.hasNoConfig());
+
assertThat(recoverableErrorsConfig.getExceptionConfigs().size()).isEqualTo(2);
+
assertThat(recoverableErrorsConfig.getExceptionConfigs().get(0).getExceptionClass())
+ .isEqualTo(java.net.UnknownHostException.class);
+
assertThat(recoverableErrorsConfig.getExceptionConfigs().get(1).getExceptionClass())
+ .isEqualTo(java.lang.IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testReturnEmptyWhenConfigNotFound() {
+ Optional<RecoverableErrorsConfig> recoverableErrorsConfigOptional =
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(new
Properties());
+ assertFalse(recoverableErrorsConfigOptional.isPresent());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedClassIsNotThrowable() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception",
"java.util.Properties");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedClassCanNotBeFound() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception",
"made.up.TestClass");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedConfigSuffixIsNotValid() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exceptionnm",
"java.lang.Exception");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 1201819..923d0ae 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -17,6 +17,8 @@
package
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
import
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
@@ -30,6 +32,7 @@ import org.junit.rules.ExpectedException;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import java.time.Duration;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +60,86 @@ public class FanOutShardSubscriberTest {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
+
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
+ }
+
+ @Test
+ public void
testRecoverableErrorThrownToConsumerWhenUserConfiguresExceptionToBeRecoverable()
+ throws Exception {
+
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new ArithmeticException());
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new
ExceptionConfig(ArithmeticException.class))));
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
+
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
+ }
+
+ @Test
+ public void
testRecoverableErrorThrownToConsumerWhenUserConfiguredExceptionIsWrapped()
+ throws Exception {
+
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new RuntimeException(new ArithmeticException()));
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new
ExceptionConfig(ArithmeticException.class))));
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
+
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
+ }
+
+ @Test
+ public void
testRetryableErrorThrownToConsumerWhenUserConfiguredExceptionIsNotThrown()
+ throws Exception {
+
thrown.expect(FanOutShardSubscriber.RetryableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new RuntimeException(new ArithmeticException()));
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new
ExceptionConfig(java.net.UnknownHostException.class))));
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -79,7 +161,8 @@ public class FanOutShardSubscriberTest {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -100,7 +183,8 @@ public class FanOutShardSubscriberTest {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
software.amazon.awssdk.services.kinesis.model.StartingPosition
startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -123,7 +207,8 @@ public class FanOutShardSubscriberTest {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
@@ -140,7 +225,8 @@ public class FanOutShardSubscriberTest {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
RecordPublisherRunResult result =
@@ -159,7 +245,7 @@ public class FanOutShardSubscriberTest {
FanOutShardSubscriber subscriber =
new FanOutShardSubscriber(
- "consumerArn", "shardId", kinesis,
Duration.ofMillis(1), () -> true);
+ "consumerArn", "shardId", kinesis,
Duration.ofMillis(1), () -> true, null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event
-> {});
@@ -180,7 +266,8 @@ public class FanOutShardSubscriberTest {
kinesis,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
Duration.ofMillis(100),
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(
@@ -207,7 +294,8 @@ public class FanOutShardSubscriberTest {
"shardId",
unboundedStream,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- run::get);
+ run::get,
+ null);
final AtomicInteger batches = new AtomicInteger(0);
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 66e36b0..dfe9cd9 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -1021,4 +1021,28 @@ public class KinesisConfigUtilTest {
.containsKey("aws.kinesis.client.user-agent-prefix")
.hasSize(2);
}
+
+ @Test
+ public void testInvalidCustomRecoverableErrorConfiguration() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage(
+ "Provided recoverable exception class could not be found:
com.NonExistentExceptionClass");
+
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX +
"[0].exception",
+ "com.NonExistentExceptionClass");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testValidCustomRecoverableErrorConfiguration() {
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX +
"[0].exception",
+ "java.net.UnknownHostException");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
}