This is an automated email from the ASF dual-hosted git repository.
junrui pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push:
new 04c9893 [FLINK-38371][Connectors/ElasticSearch] Add
`sink.retry-on-conflicts` option to Elasticsearch Sink (#132)
04c9893 is described below
commit 04c98933da231ab11a33c19968a1fe285d205276
Author: Liu Jiangang <[email protected]>
AuthorDate: Tue Mar 10 20:03:26 2026 +0800
[FLINK-38371][Connectors/ElasticSearch] Add `sink.retry-on-conflicts`
option to Elasticsearch Sink (#132)
Co-authored-by: garyjgliu <[email protected]>
---
.../connector/elasticsearch/table/ElasticsearchConfiguration.java | 5 +++++
.../elasticsearch/table/ElasticsearchConnectorOptions.java | 7 +++++++
.../connector/elasticsearch/table/ElasticsearchDynamicSink.java | 5 +++++
.../elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java | 1 +
.../connector/elasticsearch/table/RowElasticsearchEmitter.java | 6 ++++++
5 files changed, 24 insertions(+)
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index ac0c291..6c74200 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -46,6 +46,7 @@ import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICTS;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
@@ -130,6 +131,10 @@ public class ElasticsearchConfiguration {
return config.getOptional(SINK_PARALLELISM);
}
+ public int getRetryOnConflict() {
+ return config.get(RETRY_ON_CONFLICTS);
+ }
+
/**
* Parse Hosts String to list.
*
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 10ea0ae..f34cb87 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -145,4 +145,11 @@ public class ElasticsearchConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when
committing.");
+
+ public static final ConfigOption<Integer> RETRY_ON_CONFLICTS =
+ ConfigOptions.key("sink.retry-on-conflicts")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The number of retry when conflicts with
concurrent requests.");
}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index 0fd389b..d6d04a6 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -62,6 +62,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
final ElasticsearchSinkBuilderSupplier<RowData> builderSupplier;
@Nullable final String documentType;
final boolean isDynamicIndexWithSystemTime;
+ final int retryOnConflict;
ElasticsearchDynamicSink(
EncodingFormat<SerializationSchema<RowData>> format,
@@ -71,6 +72,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
String summaryString,
ElasticsearchSinkBuilderSupplier<RowData> builderSupplier,
@Nullable String documentType,
+ int retryOnConflict,
ZoneId localTimeZoneId) {
this.format = checkNotNull(format);
this.physicalRowDataType = checkNotNull(physicalRowDataType);
@@ -79,6 +81,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
this.summaryString = checkNotNull(summaryString);
this.builderSupplier = checkNotNull(builderSupplier);
this.documentType = documentType;
+ this.retryOnConflict = retryOnConflict;
this.localTimeZoneId = localTimeZoneId;
this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
}
@@ -127,6 +130,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
format,
XContentType.JSON,
documentType,
+ retryOnConflict,
createKeyExtractor());
ElasticsearchSinkBuilderBase<RowData, ? extends
ElasticsearchSinkBuilderBase> builder =
@@ -187,6 +191,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
summaryString,
builderSupplier,
documentType,
+ retryOnConflict,
localTimeZoneId);
}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index 0395eaf..2686e2e 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -128,6 +128,7 @@ abstract class ElasticsearchDynamicTableFactoryBase
capitalize(factoryIdentifier),
sinkBuilderSupplier,
getDocumentType(config),
+ config.getRetryOnConflict(),
getLocalTimeZoneId(context.getConfiguration()));
}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
index bddc6cb..f91aa9b 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
@@ -50,17 +50,20 @@ class RowElasticsearchEmitter implements
ElasticsearchEmitter<RowData> {
private final XContentType contentType;
@Nullable private final String documentType;
private final Function<RowData, String> createKey;
+ private final int retryOnConflict;
public RowElasticsearchEmitter(
IndexGenerator indexGenerator,
SerializationSchema<RowData> serializationSchema,
XContentType contentType,
@Nullable String documentType,
+ int retryOnConflict,
Function<RowData, String> createKey) {
this.indexGenerator = checkNotNull(indexGenerator);
this.serializationSchema = checkNotNull(serializationSchema);
this.contentType = checkNotNull(contentType);
this.documentType = documentType;
+ this.retryOnConflict = retryOnConflict;
this.createKey = checkNotNull(createKey);
}
@@ -110,6 +113,9 @@ class RowElasticsearchEmitter implements
ElasticsearchEmitter<RowData> {
new UpdateRequest(indexGenerator.generate(row),
documentType, key)
.doc(document, contentType)
.upsert(document, contentType);
+ if (retryOnConflict != -1) {
+ updateRequest.retryOnConflict(retryOnConflict);
+ }
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =