This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 04c9893  [FLINK-38371][Connectors/ElasticSearch] Add 
`sink.retry-on-conflicts` option to Elasticsearch Sink (#132)
04c9893 is described below

commit 04c98933da231ab11a33c19968a1fe285d205276
Author: Liu Jiangang <[email protected]>
AuthorDate: Tue Mar 10 20:03:26 2026 +0800

    [FLINK-38371][Connectors/ElasticSearch] Add `sink.retry-on-conflicts` 
option to Elasticsearch Sink (#132)
    
    Co-authored-by: garyjgliu <[email protected]>
---
 .../connector/elasticsearch/table/ElasticsearchConfiguration.java  | 5 +++++
 .../elasticsearch/table/ElasticsearchConnectorOptions.java         | 7 +++++++
 .../connector/elasticsearch/table/ElasticsearchDynamicSink.java    | 5 +++++
 .../elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java  | 1 +
 .../connector/elasticsearch/table/RowElasticsearchEmitter.java     | 6 ++++++
 5 files changed, 24 insertions(+)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index ac0c291..6c74200 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -46,6 +46,7 @@ import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICTS;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
@@ -130,6 +131,10 @@ public class ElasticsearchConfiguration {
         return config.getOptional(SINK_PARALLELISM);
     }
 
+    public int getRetryOnConflict() {
+        return config.get(RETRY_ON_CONFLICTS);
+    }
+
     /**
      * Parse Hosts String to list.
      *
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 10ea0ae..f34cb87 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -145,4 +145,11 @@ public class ElasticsearchConnectorOptions {
                     .enumType(DeliveryGuarantee.class)
                     .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when 
committing.");
+
+    public static final ConfigOption<Integer> RETRY_ON_CONFLICTS =
+            ConfigOptions.key("sink.retry-on-conflicts")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The number of retry when conflicts with 
concurrent requests.");
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index 0fd389b..d6d04a6 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -62,6 +62,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
     final ElasticsearchSinkBuilderSupplier<RowData> builderSupplier;
     @Nullable final String documentType;
     final boolean isDynamicIndexWithSystemTime;
+    final int retryOnConflict;
 
     ElasticsearchDynamicSink(
             EncodingFormat<SerializationSchema<RowData>> format,
@@ -71,6 +72,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
             String summaryString,
             ElasticsearchSinkBuilderSupplier<RowData> builderSupplier,
             @Nullable String documentType,
+            int retryOnConflict,
             ZoneId localTimeZoneId) {
         this.format = checkNotNull(format);
         this.physicalRowDataType = checkNotNull(physicalRowDataType);
@@ -79,6 +81,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
         this.summaryString = checkNotNull(summaryString);
         this.builderSupplier = checkNotNull(builderSupplier);
         this.documentType = documentType;
+        this.retryOnConflict = retryOnConflict;
         this.localTimeZoneId = localTimeZoneId;
         this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime();
     }
@@ -127,6 +130,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
                         format,
                         XContentType.JSON,
                         documentType,
+                        retryOnConflict,
                         createKeyExtractor());
 
         ElasticsearchSinkBuilderBase<RowData, ? extends 
ElasticsearchSinkBuilderBase> builder =
@@ -187,6 +191,7 @@ class ElasticsearchDynamicSink implements DynamicTableSink {
                 summaryString,
                 builderSupplier,
                 documentType,
+                retryOnConflict,
                 localTimeZoneId);
     }
 
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index 0395eaf..2686e2e 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -128,6 +128,7 @@ abstract class ElasticsearchDynamicTableFactoryBase
                 capitalize(factoryIdentifier),
                 sinkBuilderSupplier,
                 getDocumentType(config),
+                config.getRetryOnConflict(),
                 getLocalTimeZoneId(context.getConfiguration()));
     }
 
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
index bddc6cb..f91aa9b 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java
@@ -50,17 +50,20 @@ class RowElasticsearchEmitter implements 
ElasticsearchEmitter<RowData> {
     private final XContentType contentType;
     @Nullable private final String documentType;
     private final Function<RowData, String> createKey;
+    private final int retryOnConflict;
 
     public RowElasticsearchEmitter(
             IndexGenerator indexGenerator,
             SerializationSchema<RowData> serializationSchema,
             XContentType contentType,
             @Nullable String documentType,
+            int retryOnConflict,
             Function<RowData, String> createKey) {
         this.indexGenerator = checkNotNull(indexGenerator);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.contentType = checkNotNull(contentType);
         this.documentType = documentType;
+        this.retryOnConflict = retryOnConflict;
         this.createKey = checkNotNull(createKey);
     }
 
@@ -110,6 +113,9 @@ class RowElasticsearchEmitter implements 
ElasticsearchEmitter<RowData> {
                     new UpdateRequest(indexGenerator.generate(row), 
documentType, key)
                             .doc(document, contentType)
                             .upsert(document, contentType);
+            if (retryOnConflict != -1) {
+                updateRequest.retryOnConflict(retryOnConflict);
+            }
             indexer.add(updateRequest);
         } else {
             final IndexRequest indexRequest =

Reply via email to