This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ad066f5 [FLINK-24580][Connectors/Kinesis] Make
ConnectTimeoutException recoverable (#17785)
ad066f5 is described below
commit ad066f551b56a686402492e33430b0bf03d85bd0
Author: John Karp <[email protected]>
AuthorDate: Tue Dec 14 17:18:53 2021 -0600
[FLINK-24580][Connectors/Kinesis] Make ConnectTimeoutException recoverable
(#17785)
* [FLINK-24580][Connectors/Kinesis] Consider ConnectTimeoutException
recoverable
* [FLINK-24580][Connectors/Kinesis] Use ExceptionUtils to inspect exception
---
.../connectors/kinesis/proxy/KinesisProxy.java | 4 ++++
.../connectors/kinesis/proxy/KinesisProxyTest.java | 21 +++++++++++++++++++++
2 files changed, 25 insertions(+)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 8fdc490..641027c 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -22,6 +22,7 @@ import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.ExceptionUtils;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -420,6 +422,8 @@ public class KinesisProxy implements KinesisProxyInterface {
protected boolean isRecoverableSdkClientException(SdkClientException ex) {
if (ex instanceof AmazonServiceException) {
return
KinesisProxy.isRecoverableException((AmazonServiceException) ex);
+ } else if (ExceptionUtils.findThrowable(ex,
SocketTimeoutException.class).isPresent()) {
+ return true;
}
// customizations may decide to retry other errors, such as read
timeouts
return false;
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 87a12ad..11da7ae 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -38,6 +38,8 @@ import com.amazonaws.services.kinesis.model.ListShardsResult;
import
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.http.HttpHost;
+import org.apache.http.conn.ConnectTimeoutException;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
@@ -47,6 +49,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -73,6 +77,23 @@ import static org.mockito.hamcrest.MockitoHamcrest.argThat;
public class KinesisProxyTest {
@Test
+ public void testIsRecoverableExceptionWithConnectError() throws
UnknownHostException {
+ Properties kinesisConsumerConfig = new Properties();
+ kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+ KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+ final SdkClientException ex =
+ new SdkClientException(
+ "Unable to execute HTTP request",
+ new ConnectTimeoutException(
+ new java.net.SocketTimeoutException("connect
timed out"),
+ new
HttpHost("kinesis.us-east-1.amazonaws.com", 443),
+ InetAddress.getByAddress(
+ "kinesis.us-east-1.amazonaws.com",
+ new byte[] {3, 91, (byte) 171, (byte)
253})));
+ assertTrue(kinesisProxy.isRecoverableSdkClientException(ex));
+ }
+
+ @Test
public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
final ProvisionedThroughputExceededException ex =
new ProvisionedThroughputExceededException("asdf");