This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c707401 [FLINK-21068][connectors/elasticsearch] Support
'connection.request-timeout','connection.timeout', 'socket.timeout' options for
elasticsearch connector.
c707401 is described below
commit c70740179f4e89139407fabafc808a4350d5c44b
Author: jinfeng <[email protected]>
AuthorDate: Mon Dec 13 19:52:37 2021 +0800
[FLINK-21068][connectors/elasticsearch] Support
'connection.request-timeout','connection.timeout', 'socket.timeout' options for
elasticsearch connector.
---
.../docs/connectors/table/elasticsearch.md | 22 ++++++++++
.../content/docs/connectors/table/elasticsearch.md | 31 ++++++++++++++
.../sink/ElasticsearchSinkBuilderBase.java | 49 +++++++++++++++++++++-
.../elasticsearch/sink/ElasticsearchWriter.java | 20 +++++++++
.../elasticsearch/sink/NetworkClientConfig.java | 26 +++++++++++-
.../table/ElasticsearchConfiguration.java | 15 +++++++
.../table/ElasticsearchConnectorOptions.java | 21 ++++++++++
.../table/ElasticsearchDynamicSink.java | 13 ++++++
.../table/ElasticsearchDynamicSinkFactoryBase.java | 6 +++
.../sink/ElasticsearchSinkBuilderBaseTest.java | 13 ++++++
.../sink/ElasticsearchWriterITCase.java | 2 +-
11 files changed, 215 insertions(+), 3 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/elasticsearch.md
b/docs/content.zh/docs/connectors/table/elasticsearch.md
index 6bafab6..061ca27 100644
--- a/docs/content.zh/docs/connectors/table/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/table/elasticsearch.md
@@ -218,6 +218,28 @@ CREATE TABLE myUserTable (
<td>添加到每个 REST 通信中的前缀字符串,例如,<code>'/v1'</code>。</td>
</tr>
<tr>
+ <td><h5>connection.request-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。</td>
+ </tr>
+ <tr>
+ <td><h5>connection.timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。</td>
+ </tr>
+ <tr>
+ <td><h5>socket.timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。
+ </td>
+ </tr>
+ <tr>
<td><h5>format</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">json</td>
diff --git a/docs/content/docs/connectors/table/elasticsearch.md
b/docs/content/docs/connectors/table/elasticsearch.md
index a9c7b83..0879f41 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -198,6 +198,37 @@ Connector Options
<td>Prefix string to be added to every REST communication, e.g.,
<code>'/v1'</code>.</td>
</tr>
<tr>
+ <td><h5>connection.request-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The timeout in milliseconds for requesting a connection from the
connection manager.
+ The timeout must be larger than or equal to 0.
+ A timeout value of zero is interpreted as an infinite timeout.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>connection.timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The timeout in milliseconds for establishing a connection.
+ The timeout must be larger than or equal to 0.
+ A timeout value of zero is interpreted as an infinite timeout.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>socket.timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put
differently,
+ a maximum period inactivity between two consecutive data packets.
+ The timeout must be larger than or equal to 0.
+ A timeout value of zero is interpreted as an infinite timeout.
+ </td>
+ </tr>
+ <tr>
<td><h5>format</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">json</td>
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index f187cb6..fe64c94 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -54,6 +54,9 @@ public abstract class ElasticsearchSinkBuilderBase<
private String username;
private String password;
private String connectionPathPrefix;
+ private Integer connectionTimeout;
+ private Integer connectionRequestTimeout;
+ private Integer socketTimeout;
protected ElasticsearchSinkBuilderBase() {}
@@ -217,6 +220,44 @@ public abstract class ElasticsearchSinkBuilderBase<
return self();
}
+ /**
+ * Sets the timeout for requesting the connection of the Elasticsearch
cluster from the
+ * connection manager.
+ *
+ * @param timeout for the connection request
+ * @return this builder
+ */
+ public B setConnectionRequestTimeout(int timeout) {
+ checkState(timeout >= 0, "Connection request timeout must be larger
than or equal to 0.");
+ this.connectionRequestTimeout = timeout;
+ return self();
+ }
+
+ /**
+ * Sets the timeout for establishing a connection of the Elasticsearch
cluster.
+ *
+ * @param timeout for the connection
+ * @return this builder
+ */
+ public B setConnectionTimeout(int timeout) {
+ checkState(timeout >= 0, "Connection timeout must be larger than or
equal to 0.");
+ this.connectionTimeout = timeout;
+ return self();
+ }
+
+ /**
+ * Sets the timeout for waiting for data or, put differently, a maximum
period inactivity
+ * between two consecutive data packets.
+ *
+ * @param timeout for the socket
+ * @return this builder
+ */
+ public B setSocketTimeout(int timeout) {
+ checkState(timeout >= 0, "Socket timeout must be larger than or equal
to 0.");
+ this.socketTimeout = timeout;
+ return self();
+ }
+
protected abstract BulkProcessorBuilderFactory
getBulkProcessorBuilderFactory();
/**
@@ -247,7 +288,13 @@ public abstract class ElasticsearchSinkBuilderBase<
private NetworkClientConfig buildNetworkClientConfig() {
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
- return new NetworkClientConfig(username, password,
connectionPathPrefix);
+ return new NetworkClientConfig(
+ username,
+ password,
+ connectionPathPrefix,
+ connectionRequestTimeout,
+ connectionTimeout,
+ socketTimeout);
}
private BulkProcessorConfig buildBulkProcessorConfig() {
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
index f984312..53723bc 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
@@ -169,6 +169,26 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN,
Void, Void> {
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
+ if (networkClientConfig.getConnectionRequestTimeout() != null
+ || networkClientConfig.getConnectionTimeout() != null
+ || networkClientConfig.getSocketTimeout() != null) {
+ builder.setRequestConfigCallback(
+ requestConfigBuilder -> {
+ if (networkClientConfig.getConnectionRequestTimeout()
!= null) {
+ requestConfigBuilder.setConnectionRequestTimeout(
+
networkClientConfig.getConnectionRequestTimeout());
+ }
+ if (networkClientConfig.getConnectionTimeout() !=
null) {
+ requestConfigBuilder.setConnectTimeout(
+
networkClientConfig.getConnectionTimeout());
+ }
+ if (networkClientConfig.getSocketTimeout() != null) {
+ requestConfigBuilder.setSocketTimeout(
+ networkClientConfig.getSocketTimeout());
+ }
+ return requestConfigBuilder;
+ });
+ }
return builder;
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
index e093008..5ae0510 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
@@ -27,14 +27,23 @@ class NetworkClientConfig implements Serializable {
@Nullable private final String username;
@Nullable private final String password;
@Nullable private final String connectionPathPrefix;
+ @Nullable private final Integer connectionRequestTimeout;
+ @Nullable private final Integer connectionTimeout;
+ @Nullable private final Integer socketTimeout;
NetworkClientConfig(
@Nullable String username,
@Nullable String password,
- @Nullable String connectionPathPrefix) {
+ @Nullable String connectionPathPrefix,
+ @Nullable Integer connectionRequestTimeout,
+ @Nullable Integer connectionTimeout,
+ @Nullable Integer socketTimeout) {
this.username = username;
this.password = password;
this.connectionPathPrefix = connectionPathPrefix;
+ this.connectionRequestTimeout = connectionRequestTimeout;
+ this.connectionTimeout = connectionTimeout;
+ this.socketTimeout = socketTimeout;
}
@Nullable
@@ -48,6 +57,21 @@ class NetworkClientConfig implements Serializable {
}
@Nullable
+ public Integer getConnectionRequestTimeout() {
+ return connectionRequestTimeout;
+ }
+
+ @Nullable
+ public Integer getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ @Nullable
+ public Integer getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @Nullable
public String getConnectionPathPrefix() {
return connectionPathPrefix;
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index ee6488e..e684b01 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -39,11 +39,14 @@ import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,6 +107,18 @@ class ElasticsearchConfiguration {
return config.getOptional(CONNECTION_PATH_PREFIX_OPTION);
}
+ public Optional<Duration> getConnectionRequestTimeout() {
+ return config.getOptional(CONNECTION_REQUEST_TIMEOUT);
+ }
+
+ public Optional<Duration> getConnectionTimeout() {
+ return config.getOptional(CONNECTION_TIMEOUT);
+ }
+
+ public Optional<Duration> getSocketTimeout() {
+ return config.getOptional(SOCKET_TIMEOUT);
+ }
+
public List<HttpHost> getHosts() {
return config.get(HOSTS_OPTION).stream()
.map(ElasticsearchConfiguration::validateAndParseHostsString)
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 5cff4ee..672f0727 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -111,6 +111,27 @@ public class ElasticsearchConnectorOptions {
.noDefaultValue()
.withDescription("Prefix string to be added to every REST
communication.");
+ public static final ConfigOption<Duration> CONNECTION_REQUEST_TIMEOUT =
+ ConfigOptions.key("connection.request-timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The timeout for requesting a connection from the
connection manager.");
+
+ public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
+ ConfigOptions.key("connection.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("The timeout for establishing a
connection.");
+
+ public static final ConfigOption<Duration> SOCKET_TIMEOUT =
+ ConfigOptions.key("socket.timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The socket timeout (SO_TIMEOUT) for waiting for
data or, put differently,"
+ + "a maximum period inactivity between two
consecutive data packets.");
+
public static final ConfigOption<String> FORMAT_OPTION =
ConfigOptions.key("format")
.stringType()
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index e5f94f1..0938e02 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -144,6 +144,19 @@ class ElasticsearchDynamicSink implements DynamicTableSink
{
builder.setConnectionPathPrefix(config.getPathPrefix().get());
}
+ if (config.getConnectionRequestTimeout().isPresent()) {
+ builder.setConnectionRequestTimeout(
+ (int)
config.getConnectionRequestTimeout().get().getSeconds());
+ }
+
+ if (config.getConnectionTimeout().isPresent()) {
+ builder.setConnectionTimeout((int)
config.getConnectionTimeout().get().getSeconds());
+ }
+
+ if (config.getSocketTimeout().isPresent()) {
+ builder.setSocketTimeout((int)
config.getSocketTimeout().get().getSeconds());
+ }
+
return SinkProvider.of(builder.build());
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
index e124ad6..8028284 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
@@ -54,12 +54,15 @@ import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.elasticsearch.common.Strings.capitalize;
@@ -209,6 +212,9 @@ abstract class ElasticsearchDynamicSinkFactoryBase
implements DynamicTableSinkFa
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
BULK_FLUSH_BACKOFF_DELAY_OPTION,
CONNECTION_PATH_PREFIX_OPTION,
+ CONNECTION_REQUEST_TIMEOUT,
+ CONNECTION_TIMEOUT,
+ SOCKET_TIMEOUT,
FORMAT_OPTION,
DELIVERY_GUARANTEE_OPTION,
PASSWORD_OPTION,
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index e65d844..83f7871 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -77,6 +77,19 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends
ElasticsearchSinkBuild
() -> createEmptyBuilder().setHosts(new
HttpHost("localhost:3000")).build());
}
+ @Test
+ void testThrowIfSetInvalidTimeouts() {
+ assertThrows(
+ IllegalStateException.class,
+ () ->
createEmptyBuilder().setConnectionRequestTimeout(-1).build());
+ assertThrows(
+ IllegalStateException.class,
+ () -> createEmptyBuilder().setConnectionTimeout(-1).build());
+ assertThrows(
+ IllegalStateException.class,
+ () -> createEmptyBuilder().setSocketTimeout(-1).build());
+ }
+
abstract B createEmptyBuilder();
abstract B createMinimalBuilder();
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index 241ef6a..222f8b3 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -248,7 +248,7 @@ class ElasticsearchWriterITCase {
flushOnCheckpoint,
bulkProcessorConfig,
new TestBulkProcessorBuilderFactory(),
- new NetworkClientConfig(null, null, null),
+ new NetworkClientConfig(null, null, null, null, null, null),
metricGroup,
new TestMailbox());
}