This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ad066f5  [FLINK-24580][Connectors/Kinesis] Make 
ConnectTimeoutException recoverable (#17785)
ad066f5 is described below

commit ad066f551b56a686402492e33430b0bf03d85bd0
Author: John Karp <[email protected]>
AuthorDate: Tue Dec 14 17:18:53 2021 -0600

    [FLINK-24580][Connectors/Kinesis] Make ConnectTimeoutException recoverable 
(#17785)
    
    * [FLINK-24580][Connectors/Kinesis] Consider ConnectTimeoutException 
recoverable
    
    * [FLINK-24580][Connectors/Kinesis] Use ExceptionUtils to inspect exception
---
 .../connectors/kinesis/proxy/KinesisProxy.java      |  4 ++++
 .../connectors/kinesis/proxy/KinesisProxyTest.java  | 21 +++++++++++++++++++++
 2 files changed, 25 insertions(+)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 8fdc490..641027c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.ExceptionUtils;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -420,6 +422,8 @@ public class KinesisProxy implements KinesisProxyInterface {
     protected boolean isRecoverableSdkClientException(SdkClientException ex) {
         if (ex instanceof AmazonServiceException) {
             return 
KinesisProxy.isRecoverableException((AmazonServiceException) ex);
+        } else if (ExceptionUtils.findThrowable(ex, 
SocketTimeoutException.class).isPresent()) {
+            return true;
         }
         // customizations may decide to retry other errors, such as read 
timeouts
         return false;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 87a12ad..11da7ae 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -38,6 +38,8 @@ import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.http.HttpHost;
+import org.apache.http.conn.ConnectTimeoutException;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
@@ -47,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,6 +77,23 @@ import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 public class KinesisProxyTest {
 
     @Test
+    public void testIsRecoverableExceptionWithConnectError() throws 
UnknownHostException {
+        Properties kinesisConsumerConfig = new Properties();
+        kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+        KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+        final SdkClientException ex =
+                new SdkClientException(
+                        "Unable to execute HTTP request",
+                        new ConnectTimeoutException(
+                                new java.net.SocketTimeoutException("connect 
timed out"),
+                                new 
HttpHost("kinesis.us-east-1.amazonaws.com", 443),
+                                InetAddress.getByAddress(
+                                        "kinesis.us-east-1.amazonaws.com",
+                                        new byte[] {3, 91, (byte) 171, (byte) 
253})));
+        assertTrue(kinesisProxy.isRecoverableSdkClientException(ex));
+    }
+
+    @Test
     public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
         final ProvisionedThroughputExceededException ex =
                 new ProvisionedThroughputExceededException("asdf");

Reply via email to