This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c2fdd9  [FLINK-33260][Connectors/Kinesis] Allow user to provide a 
list of recoverable exceptions (#110)
4c2fdd9 is described below

commit 4c2fdd94e27312282e11b17e12f4ec3e5adc113e
Author: Emre Kartoglu <[email protected]>
AuthorDate: Tue Nov 7 22:53:17 2023 +0000

    [FLINK-33260][Connectors/Kinesis] Allow user to provide a list of 
recoverable exceptions (#110)
---
 docs/content.zh/docs/connectors/table/kinesis.md   |   8 ++
 docs/content/docs/connectors/table/kinesis.md      |   8 ++
 .../kinesis/config/ConsumerConfigConstants.java    |   7 ++
 .../connectors/kinesis/config/ExceptionConfig.java |  35 +++++++
 .../kinesis/config/RecoverableErrorsConfig.java    |  99 ++++++++++++++++++++
 .../publisher/fanout/FanOutRecordPublisher.java    |   3 +-
 .../fanout/FanOutRecordPublisherConfiguration.java |  14 +++
 .../publisher/fanout/FanOutShardSubscriber.java    |  75 +++++++++++----
 .../connectors/kinesis/util/KinesisConfigUtil.java |   7 ++
 .../config/RecoverableErrorsConfigTest.java        |  83 ++++++++++++++++
 .../fanout/FanOutShardSubscriberTest.java          | 104 +++++++++++++++++++--
 .../kinesis/util/KinesisConfigUtilTest.java        |  24 +++++
 12 files changed, 439 insertions(+), 28 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/kinesis.md 
b/docs/content.zh/docs/connectors/table/kinesis.md
index b9010ff..43fa007 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -680,6 +680,14 @@ Connector Options
       <td>Long</td>
       <td>The interval (in milliseconds) after which to consider a shard idle 
for purposes of watermark generation. A positive value will allow the watermark 
to progress even when some shards don't receive new records.</td>
     </tr>
+    <tr>
+      <td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>User-specified Exception to retry indefinitely. Example value: 
`java.net.UnknownHostException`. This configuration is a zero-based array. As 
such, the specified exceptions must start with index 0. Specified exceptions 
must be valid Throwables in classpath, or connector will fail to initialize and 
fail fast.</td>
+    </tr> 
     <tr>
       <td><h5>scan.watermark.sync.interval</h5></td>
       <td>optional</td>
diff --git a/docs/content/docs/connectors/table/kinesis.md 
b/docs/content/docs/connectors/table/kinesis.md
index 3ad681a..46cf1c7 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -681,6 +681,14 @@ Connector Options
       <td>Long</td>
       <td>The interval (in milliseconds) after which to consider a shard idle 
for purposes of watermark generation. A positive value will allow the watermark 
to progress even when some shards don't receive new records.</td>
     </tr>
+   <tr>
+      <td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>User-specified Exception to retry indefinitely. Example value: 
`java.net.UnknownHostException`. This configuration is a zero-based array. As 
such, the specified exceptions must start with index 0. Specified exceptions 
must be valid Throwables in classpath, or connector will fail to initialize and 
fail fast.</td>
+    </tr> 
     <tr>
       <td><h5>scan.watermark.sync.interval</h5></td>
       <td>optional</td>
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index fff44d6..7106822 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -191,6 +191,13 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
     public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
             "flink.stream.registerstreamconsumer.backoff.expconst";
 
+    /**
+     * The user-provided list of exceptions to recover from. These exceptions 
are retried
+     * indefinitely.
+     */
+    public static final String RECOVERABLE_EXCEPTIONS_PREFIX =
+            "flink.shard.consumer.error.recoverable";
+
     /** The maximum number of deregisterStream attempts if we get a 
recoverable exception. */
     public static final String DEREGISTER_STREAM_RETRIES =
             "flink.stream.deregisterstreamconsumer.maxretries";
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
new file mode 100644
index 0000000..d3b4080
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ * Helper class to hold information/behaviour about Exceptions. Used for 
configuring recoverable
+ * exceptions.
+ */
+public class ExceptionConfig {
+    private final Class<?> exceptionClass;
+
+    public ExceptionConfig(Class<?> exClass) {
+        this.exceptionClass = exClass;
+    }
+
+    public Class<?> getExceptionClass() {
+        return exceptionClass;
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
new file mode 100644
index 0000000..8c5c4df
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Hosts the recoverable exception configuration. Recoverable exceptions are 
retried indefinitely.
+ */
+public class RecoverableErrorsConfig {
+    public static final String INVALID_CONFIG_MESSAGE =
+            "Invalid config for recoverable consumer exceptions. "
+                    + "Valid config example: "
+                    + 
"`flink.shard.consumer.error.recoverable[0].exception=net.java.UnknownHostException`.
 "
+                    + "Your config array must use zero-based indexing as shown 
in the example.";
+
+    /**
+     * Parses the array of recoverable error configs.
+     *
+     * @param config connector configuration
+     * @return an Optional of RecoverableErrorsConfig
+     */
+    public static Optional<RecoverableErrorsConfig> 
createConfigFromPropertiesOrThrow(
+            final Properties config) {
+        List<ExceptionConfig> exConfs = new ArrayList<>();
+        int idx = 0;
+        String exceptionConfigKey =
+                String.format(
+                        "%s[%d].exception",
+                        ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, 
idx);
+        while (config.containsKey(exceptionConfigKey)) {
+            String exPath = config.getProperty(exceptionConfigKey);
+            try {
+                Class<?> aClass = Class.forName(exPath);
+                if (!Throwable.class.isAssignableFrom(aClass)) {
+                    throw new ClassCastException();
+                }
+                exConfs.add(new ExceptionConfig(aClass));
+            } catch (ClassCastException e) {
+                throw new IllegalArgumentException(
+                        "Provided recoverable exception class is not a 
Throwable: " + exPath);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        "Provided recoverable exception class could not be 
found: " + exPath);
+            }
+            exceptionConfigKey =
+                    String.format(
+                            "%s[%d].exception",
+                            
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
+        }
+        if (idx > 0) {
+            // We processed configs successfully
+            return Optional.of(new RecoverableErrorsConfig(exConfs));
+        }
+
+        // Check if user provided wrong config suffix, so they fail faster
+        for (Object key : config.keySet()) {
+            if (((String) 
key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
+                throw new 
IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private final List<ExceptionConfig> exceptionConfigs;
+
+    public RecoverableErrorsConfig(List<ExceptionConfig> exceptionConfigs) {
+        this.exceptionConfigs = exceptionConfigs;
+    }
+
+    public boolean hasNoConfig() {
+        return CollectionUtils.isEmpty(exceptionConfigs);
+    }
+
+    public List<ExceptionConfig> getExceptionConfigs() {
+        return exceptionConfigs;
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index e411512..ee0623e 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -168,7 +168,8 @@ public class FanOutRecordPublisher implements 
RecordPublisher {
                         subscribedShard.getShard().getShardId(),
                         kinesisProxy,
                         configuration.getSubscribeToShardTimeout(),
-                        runningSupplier);
+                        runningSupplier,
+                        configuration.getRecoverableErrorsConfig());
         RecordPublisherRunResult result;
 
         try {
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index cd46876..cbc5103 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -118,6 +119,9 @@ public class FanOutRecordPublisherConfiguration {
     /** Exponential backoff power constant for the describe stream consumer 
operation. */
     private final double describeStreamConsumerExpConstant;
 
+    /** Recoverable error configuration. These are retried indefinitely. */
+    private final RecoverableErrorsConfig recoverableErrorsConfig;
+
     /**
      * Creates a FanOutRecordPublisherConfiguration.
      *
@@ -318,6 +322,8 @@ public class FanOutRecordPublisherConfiguration {
                         .orElse(
                                 ConsumerConfigConstants
                                         
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
+
+        this.recoverableErrorsConfig = 
this.parseRecoverableErrorConfig(configProps);
     }
 
     // ------------------------------------------------------------------------
@@ -472,4 +478,12 @@ public class FanOutRecordPublisherConfiguration {
     public Optional<String> getStreamConsumerArn(String stream) {
         return Optional.ofNullable(streamConsumerArns.get(stream));
     }
+
+    public RecoverableErrorsConfig parseRecoverableErrorConfig(final 
Properties config) {
+        return 
RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config).orElse(null);
+    }
+
+    public RecoverableErrorsConfig getRecoverableErrorsConfig() {
+        return recoverableErrorsConfig;
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index a9c7c09..21fe050 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -19,9 +19,12 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import 
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import io.netty.handler.timeout.ReadTimeoutException;
@@ -38,6 +41,7 @@ import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
 import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
 
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
@@ -121,6 +125,8 @@ public class FanOutShardSubscriber {
 
     private final Supplier<Boolean> runningSupplier;
 
+    private final RecoverableErrorsConfig recoverableErrorsConfig;
+
     /**
      * Create a new Fan Out Shard subscriber.
      *
@@ -130,20 +136,24 @@ public class FanOutShardSubscriber {
      * @param subscribeToShardTimeout A timeout when waiting for a shard 
subscription to be
      *     established
      * @param runningSupplier a callback to query if the consumer is still 
running
+     * @param recoverableErrorsConfig recoverable error configuration (errors 
that are retried
+     *     indefinitely)
      */
     FanOutShardSubscriber(
             final String consumerArn,
             final String shardId,
             final KinesisProxyAsyncV2Interface kinesis,
             final Duration subscribeToShardTimeout,
-            final Supplier<Boolean> runningSupplier) {
+            final Supplier<Boolean> runningSupplier,
+            final RecoverableErrorsConfig recoverableErrorsConfig) {
         this(
                 consumerArn,
                 shardId,
                 kinesis,
                 subscribeToShardTimeout,
                 DEFAULT_QUEUE_TIMEOUT,
-                runningSupplier);
+                runningSupplier,
+                recoverableErrorsConfig);
     }
 
     /**
@@ -164,13 +174,15 @@ public class FanOutShardSubscriber {
             final KinesisProxyAsyncV2Interface kinesis,
             final Duration subscribeToShardTimeout,
             final Duration queueWaitTimeout,
-            final Supplier<Boolean> runningSupplier) {
+            final Supplier<Boolean> runningSupplier,
+            final RecoverableErrorsConfig recoverableErrorsConfig) {
         this.kinesis = Preconditions.checkNotNull(kinesis);
         this.consumerArn = Preconditions.checkNotNull(consumerArn);
         this.shardId = Preconditions.checkNotNull(shardId);
         this.subscribeToShardTimeout = subscribeToShardTimeout;
         this.queueWaitTimeout = queueWaitTimeout;
         this.runningSupplier = runningSupplier;
+        this.recoverableErrorsConfig = recoverableErrorsConfig;
     }
 
     /**
@@ -249,22 +261,21 @@ public class FanOutShardSubscriber {
 
         kinesis.subscribeToShard(request, responseHandler);
 
-        boolean subscriptionEstablished =
-                waitForSubscriptionLatch.await(
+        boolean subscriptionTimedOut =
+                !waitForSubscriptionLatch.await(
                         subscribeToShardTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
 
-        if (!subscriptionEstablished) {
+        if (subscriptionTimedOut) {
             final String errorMessage =
                     "Timed out acquiring subscription - " + shardId + " (" + 
consumerArn + ")";
             LOG.error(errorMessage);
             subscription.cancelSubscription();
-            handleError(
-                    new RecoverableFanOutSubscriberException(new 
TimeoutException(errorMessage)));
+            handleErrorAndRethrow(new TimeoutException(errorMessage));
         }
 
         Throwable throwable = exception.get();
         if (throwable != null) {
-            handleError(throwable);
+            handleErrorAndRethrow(throwable);
         }
 
         LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
@@ -282,7 +293,7 @@ public class FanOutShardSubscriber {
      *
      * @param throwable the exception that has occurred
      */
-    private void handleError(final Throwable throwable) throws 
FanOutSubscriberException {
+    private void handleErrorAndRethrow(final Throwable throwable) throws 
FanOutSubscriberException {
         Throwable cause;
         if (throwable instanceof CompletionException || throwable instanceof 
ExecutionException) {
             cause = throwable.getCause();
@@ -302,16 +313,10 @@ public class FanOutShardSubscriber {
             throw new FanOutSubscriberInterruptedException(throwable);
         } else if (cause instanceof FanOutSubscriberException) {
             throw (FanOutSubscriberException) cause;
-        } else if (cause instanceof ReadTimeoutException) {
-            // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches
-            // take longer to
-            // process than standard read timeout (default 30s). Recoverable 
exceptions are intended
-            // to be retried
-            // indefinitely to avoid system degradation under backpressure. 
The EFO connection
-            // (subscription) to Kinesis
-            // is closed, and reacquired once the queue of records has been 
processed.
+        } else if (isDefinedAsRecoverable(cause)) {
             throw new RecoverableFanOutSubscriberException(cause);
         } else {
+            // All other errors are treated as retryable
             throw new RetryableFanOutSubscriberException(cause);
         }
     }
@@ -329,6 +334,38 @@ public class FanOutShardSubscriber {
         return false;
     }
 
+    private boolean isDefinedAsRecoverable(Throwable cause) {
+        // non-customisable list of exceptions that should be recovered 
(retried indefinitely).
+        if (cause instanceof ReadTimeoutException || cause instanceof 
TimeoutException) {
+            // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches
+            // take longer to
+            // process than standard read timeout (default 30s). Recoverable 
exceptions are intended
+            // to be retried
+            // indefinitely to avoid system degradation under backpressure. 
The EFO connection
+            // (subscription) to Kinesis
+            // is closed, and reacquired once the queue of records has been 
processed.
+            return true;
+        }
+        return isConfiguredAsRecoverable(cause);
+    }
+
+    /**
+     * @param cause Throwable on which to base our exception search
+     * @return true if the input Throwable is configured as a Recoverable 
Error by the user
+     */
+    private boolean isConfiguredAsRecoverable(Throwable cause) {
+        if (this.recoverableErrorsConfig == null || 
this.recoverableErrorsConfig.hasNoConfig()) {
+            return false;
+        }
+        for (ExceptionConfig config : 
this.recoverableErrorsConfig.getExceptionConfigs()) {
+            Optional<Throwable> throwable =
+                    ExceptionUtils.findThrowable(
+                            cause, (Class<Throwable>) 
config.getExceptionClass());
+            return throwable.isPresent();
+        }
+        return false;
+    }
+
     /**
      * Once the subscription is open, records will be delivered to the {@link 
BlockingQueue}. Queue
      * capacity is hardcoded to 1 record, the queue is used solely to separate 
consumption and
@@ -386,7 +423,7 @@ public class FanOutShardSubscriber {
                 // The subscription is complete, but the shard might not be, 
so we return incomplete
                 return INCOMPLETE;
             } else {
-                handleError(subscriptionEvent.getThrowable());
+                handleErrorAndRethrow(subscriptionEvent.getThrowable());
                 result = INCOMPLETE;
                 break;
             }
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 5cf8579..1dcd4e0 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
 
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 
@@ -322,6 +323,12 @@ public class KinesisConfigUtil {
                 config,
                 ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
                 "Invalid value given for EFO HTTP client max concurrency. Must 
be positive.");
+
+        validateRecoverableErrorConfig(config);
+    }
+
+    private static void validateRecoverableErrorConfig(Properties config) {
+        RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
     }
 
     /**
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
new file mode 100644
index 0000000..c1cb181
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RecoverableErrorsConfig}. */
+public class RecoverableErrorsConfigTest {
+
+    @Test
+    public void testParseConfigFromProperties() {
+        Properties config = new Properties();
+        config.setProperty(
+                "flink.shard.consumer.error.recoverable[0].exception",
+                "java.net.UnknownHostException");
+        config.setProperty(
+                "flink.shard.consumer.error.recoverable[1].exception",
+                "java.lang.IllegalArgumentException");
+        Optional<RecoverableErrorsConfig> recoverableErrorsConfigOptional =
+                
RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+        assertTrue(recoverableErrorsConfigOptional.isPresent());
+        RecoverableErrorsConfig recoverableErrorsConfig = 
recoverableErrorsConfigOptional.get();
+        assertFalse(recoverableErrorsConfig.hasNoConfig());
+        
assertThat(recoverableErrorsConfig.getExceptionConfigs().size()).isEqualTo(2);
+        
assertThat(recoverableErrorsConfig.getExceptionConfigs().get(0).getExceptionClass())
+                .isEqualTo(java.net.UnknownHostException.class);
+        
assertThat(recoverableErrorsConfig.getExceptionConfigs().get(1).getExceptionClass())
+                .isEqualTo(java.lang.IllegalArgumentException.class);
+    }
+
+    @Test
+    public void testReturnEmptyWhenConfigNotFound() {
+        Optional<RecoverableErrorsConfig> recoverableErrorsConfigOptional =
+                RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(new 
Properties());
+        assertFalse(recoverableErrorsConfigOptional.isPresent());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowsExceptionWhenProvidedClassIsNotThrowable() {
+        Properties config = new Properties();
+        config.setProperty(
+                "flink.shard.consumer.error.recoverable[0].exception", 
"java.util.Properties");
+        RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowsExceptionWhenProvidedClassCanNotBeFound() {
+        Properties config = new Properties();
+        config.setProperty(
+                "flink.shard.consumer.error.recoverable[0].exception", 
"made.up.TestClass");
+        RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testThrowsExceptionWhenProvidedConfigSuffixIsNotValid() {
+        Properties config = new Properties();
+        config.setProperty(
+                "flink.shard.consumer.error.recoverable[0].exceptionnm", 
"java.lang.Exception");
+        RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 1201819..923d0ae 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -17,6 +17,8 @@
 
 package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
 
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import 
org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
@@ -30,6 +32,7 @@ import org.junit.rules.ExpectedException;
 import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -57,7 +60,86 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         errorKinesisV2,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        () -> true);
+                        () -> true,
+                        null);
+
+        software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
+                
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
+    }
+
+    @Test
+    public void 
testRecoverableErrorThrownToConsumerWhenUserConfiguresExceptionToBeRecoverable()
+            throws Exception {
+        
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+        thrown.expectMessage("java.lang.ArithmeticException");
+
+        SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new ArithmeticException());
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+                        () -> true,
+                        new RecoverableErrorsConfig(
+                                Collections.singletonList(
+                                        new 
ExceptionConfig(ArithmeticException.class))));
+
+        software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
+                
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
+    }
+
+    @Test
+    public void 
testRecoverableErrorThrownToConsumerWhenUserConfiguredExceptionIsWrapped()
+            throws Exception {
+        
thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+        thrown.expectMessage("java.lang.ArithmeticException");
+
+        SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new RuntimeException(new ArithmeticException()));
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+                        () -> true,
+                        new RecoverableErrorsConfig(
+                                Collections.singletonList(
+                                        new 
ExceptionConfig(ArithmeticException.class))));
+
+        software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
+                
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
+    }
+
+    @Test
+    public void 
testRetryableErrorThrownToConsumerWhenUserConfiguredExceptionIsNotThrown()
+            throws Exception {
+        
thrown.expect(FanOutShardSubscriber.RetryableFanOutSubscriberException.class);
+        thrown.expectMessage("java.lang.ArithmeticException");
+
+        SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new RuntimeException(new ArithmeticException()));
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+                        () -> true,
+                        new RecoverableErrorsConfig(
+                                Collections.singletonList(
+                                        new 
ExceptionConfig(java.net.UnknownHostException.class))));
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -79,7 +161,8 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         errorKinesisV2,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        () -> true);
+                        () -> true,
+                        null);
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -100,7 +183,8 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         errorKinesisV2,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        () -> true);
+                        () -> true,
+                        null);
 
         software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
                 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -123,7 +207,8 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         errorKinesisV2,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        () -> true);
+                        () -> true,
+                        null);
 
         StartingPosition startingPosition = StartingPosition.builder().build();
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
@@ -140,7 +225,8 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         errorKinesisV2,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        () -> true);
+                        () -> true,
+                        null);
 
         StartingPosition startingPosition = StartingPosition.builder().build();
         RecordPublisherRunResult result =
@@ -159,7 +245,7 @@ public class FanOutShardSubscriberTest {
 
         FanOutShardSubscriber subscriber =
                 new FanOutShardSubscriber(
-                        "consumerArn", "shardId", kinesis, 
Duration.ofMillis(1), () -> true);
+                        "consumerArn", "shardId", kinesis, 
Duration.ofMillis(1), () -> true, null);
 
         StartingPosition startingPosition = StartingPosition.builder().build();
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
@@ -180,7 +266,8 @@ public class FanOutShardSubscriberTest {
                         kinesis,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
                         Duration.ofMillis(100),
-                        () -> true);
+                        () -> true,
+                        null);
 
         StartingPosition startingPosition = StartingPosition.builder().build();
         subscriber.subscribeToShardAndConsumeRecords(
@@ -207,7 +294,8 @@ public class FanOutShardSubscriberTest {
                         "shardId",
                         unboundedStream,
                         DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
-                        run::get);
+                        run::get,
+                        null);
 
         final AtomicInteger batches = new AtomicInteger(0);
 
diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 66e36b0..dfe9cd9 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -1021,4 +1021,28 @@ public class KinesisConfigUtilTest {
                 .containsKey("aws.kinesis.client.user-agent-prefix")
                 .hasSize(2);
     }
+
+    @Test
+    public void testInvalidCustomRecoverableErrorConfiguration() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(
+                "Provided recoverable exception class could not be found: 
com.NonExistentExceptionClass");
+
+        Properties testConfig = TestUtils.getStandardProperties();
+        testConfig.setProperty(
+                ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX + 
"[0].exception",
+                "com.NonExistentExceptionClass");
+
+        KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+    }
+
+    @Test
+    public void testValidCustomRecoverableErrorConfiguration() {
+        Properties testConfig = TestUtils.getStandardProperties();
+        testConfig.setProperty(
+                ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX + 
"[0].exception",
+                "java.net.UnknownHostException");
+
+        KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+    }
 }


Reply via email to