This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d71e79013 [INLONG-7809][Sort] ES multiple sink support dirty data 
runtime strategies (#7812)
d71e79013 is described below

commit d71e7901339773f2be241a929243e69a317c6033
Author: Yizhou Yang <[email protected]>
AuthorDate: Tue Apr 11 15:59:56 2023 +0800

    [INLONG-7809][Sort] ES multiple sink support dirty data runtime strategies 
(#7812)
    
    Co-authored-by: Yizhou Yang <[email protected]>
---
 .../sort/base/metric/sub/SinkTableMetricData.java  | 48 ++++++++++++
 .../sort/elasticsearch6/ElasticsearchSink.java     | 14 +++-
 .../table/Elasticsearch6DynamicSink.java           |  1 +
 .../sort/elasticsearch7/ElasticsearchSink.java     | 14 +++-
 .../table/Elasticsearch7DynamicSink.java           |  1 +
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 13 +++-
 .../MultipleElasticsearchSinkFunctionBase.java     | 91 +++++++++++++---------
 7 files changed, 138 insertions(+), 44 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index 40a869d87..92f9da48f 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -223,6 +223,30 @@ public class SinkTableMetricData extends SinkMetricData 
implements SinkSubMetric
         subSinkMetricData.invoke(rowCount, rowSize);
     }
 
+    /**
+     * output metrics
+     *
+     * @param index the index name of record
+     * @param rowCount the row count of records
+     * @param rowSize the row size of records
+     */
+    public void outputMetrics(String index, long rowCount, long rowSize) {
+        if (StringUtils.isBlank(index)) {
+            invoke(rowCount, rowSize);
+            return;
+        }
+        SinkMetricData subSinkMetricData;
+        if (subSinkMetricMap.containsKey(index)) {
+            subSinkMetricData = subSinkMetricMap.get(index);
+        } else {
+            subSinkMetricData = buildSubSinkMetricData(new String[]{index}, 
this);
+            subSinkMetricMap.put(index, subSinkMetricData);
+        }
+        // sink metric and sub sink metric output metrics
+        this.invoke(rowCount, rowSize);
+        subSinkMetricData.invoke(rowCount, rowSize);
+    }
+
     /**
      * output dirty metrics with estimate
      *
@@ -336,6 +360,30 @@ public class SinkTableMetricData extends SinkMetricData 
implements SinkSubMetric
         subSinkMetricData.invokeDirty(rowCount, rowSize);
     }
 
+    /**
+     * output dirty metrics
+     *
+     * @param index the table name of record
+     * @param rowCount the row count of records
+     * @param rowSize the row size of records
+     */
+    public void outputDirtyMetrics(String index, long rowCount, long rowSize) {
+        if (StringUtils.isBlank(index)) {
+            invokeDirty(rowCount, rowSize);
+            return;
+        }
+        SinkMetricData subSinkMetricData;
+        if (subSinkMetricMap.containsKey(index)) {
+            subSinkMetricData = subSinkMetricMap.get(index);
+        } else {
+            subSinkMetricData = buildSubSinkMetricData(new String[]{index}, 
this);
+            subSinkMetricMap.put(index, subSinkMetricData);
+        }
+        // sink metric and sub sink metric output metrics
+        this.invokeDirty(rowCount, rowSize);
+        subSinkMetricData.invokeDirty(rowCount, rowSize);
+    }
+
     /**
      * output dirty metrics with estimate
      *
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index 15f60bfb8..e4f8ff37b 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -83,14 +83,16 @@ public class ElasticsearchSink<T>
             RestClientFactory restClientFactory,
             String inlongMetric,
             DirtySinkHelper<Object> dirtySinkHelper,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            boolean multipleSink) {
         super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
                 inlongMetric,
                 dirtySinkHelper,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                multipleSink);
     }
 
     @Override
@@ -131,6 +133,7 @@ public class ElasticsearchSink<T>
         private String inlongMetric = null;
         private DirtySinkHelper<Object> dirtySinkHelper;
         private String auditHostAndPorts;
+        private boolean multipleSink;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the 
cluster using a {@link
@@ -286,6 +289,10 @@ public class ElasticsearchSink<T>
             this.restClientFactory = 
Preconditions.checkNotNull(restClientFactory);
         }
 
+        public void setMultipleSink(boolean multipleSink) {
+            this.multipleSink = multipleSink;
+        }
+
         /**
          * Creates the Elasticsearch sink.
          * Use {@link DirtySinkFailureHandler} when need sink dirty data
@@ -304,7 +311,8 @@ public class ElasticsearchSink<T>
                     restClientFactory,
                     inlongMetric,
                     dirtySinkHelper,
-                    auditHostAndPorts);
+                    auditHostAndPorts,
+                    multipleSink);
         }
 
         @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 3372896f5..15bed1d97 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -174,6 +174,7 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
                 builder.setRestClientFactory(
                         new 
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
             }
+            builder.setMultipleSink(multipleSink);
             final ElasticsearchSink<RowData> sink = builder.build();
             if (config.isDisableFlushOnCheckpoint()) {
                 sink.disableFlushOnCheckpoint();
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index ead02098e..5719dbbc4 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -83,14 +83,16 @@ public class ElasticsearchSink<T>
             RestClientFactory restClientFactory,
             String inlongMetric,
             DirtySinkHelper<Object> dirtySinkHelper,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            boolean multipleSink) {
         super(new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
                 inlongMetric,
                 dirtySinkHelper,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                multipleSink);
     }
 
     @Override
@@ -131,6 +133,7 @@ public class ElasticsearchSink<T>
         private String inlongMetric = null;
         private DirtySinkHelper<Object> dirtySinkHelper;
         private String auditHostAndPorts;
+        private boolean multipleSink;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the 
cluster using a {@link
@@ -286,6 +289,10 @@ public class ElasticsearchSink<T>
             this.restClientFactory = 
Preconditions.checkNotNull(restClientFactory);
         }
 
+        public void setMultipleSink(boolean multipleSink) {
+            this.multipleSink = multipleSink;
+        }
+
         /**
          * Creates the Elasticsearch sink.
          * Use {@link DirtySinkFailureHandler} when need sink dirty data
@@ -304,7 +311,8 @@ public class ElasticsearchSink<T>
                     restClientFactory,
                     inlongMetric,
                     dirtySinkHelper,
-                    auditHostAndPorts);
+                    auditHostAndPorts,
+                    multipleSink);
         }
 
         @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 05e34ee4b..74f74e1a8 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -207,6 +207,7 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
                 builder.setRestClientFactory(
                         new 
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
             }
+            builder.setMultipleSink(multipleSink);
             final ElasticsearchSink<RowData> sink = builder.build();
             if (config.isDisableFlushOnCheckpoint()) {
                 sink.disableFlushOnCheckpoint();
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index b0efe1888..65faddf6a 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import java.io.Serializable;
@@ -160,9 +161,10 @@ public abstract class ElasticsearchSinkBase<T, Request, 
Builder, Listener, BulkI
      * Bulk processor to buffer and send requests to Elasticsearch, created 
using the client.
      */
     private transient BulkProcessor bulkProcessor;
-    private SinkMetricData sinkMetricData;
+    private SinkTableMetricData sinkMetricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
+    private final boolean multipleSink;
 
     public ElasticsearchSinkBase(
             ElasticsearchApiCallBridge<Request, Builder, Listener, 
BulkItemResponse, BulkProcessor, C> callBridge,
@@ -171,13 +173,15 @@ public abstract class ElasticsearchSinkBase<T, Request, 
Builder, Listener, BulkI
             ActionRequestFailureHandler<Request> failureHandler,
             String inlongMetric,
             DirtySinkHelper<Object> dirtySinkHelper,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            boolean multipleSink) {
         this.inlongMetric = inlongMetric;
         this.dirtySinkHelper = dirtySinkHelper;
         this.callBridge = checkNotNull(callBridge);
         this.elasticsearchSinkFunction = 
checkNotNull(elasticsearchSinkFunction);
         this.failureHandler = checkNotNull(failureHandler);
         this.auditHostAndPorts = auditHostAndPorts;
+        this.multipleSink = multipleSink;
         // we eagerly check if the user-provided sink function and failure 
handler is serializable;
         // otherwise, if they aren't serializable, users will merely get a 
non-informative error
         // message
@@ -266,7 +270,10 @@ public abstract class ElasticsearchSinkBase<T, Request, 
Builder, Listener, BulkI
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            sinkMetricData = new SinkTableMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            if (multipleSink) {
+                sinkMetricData.registerSubMetricsGroup(metricState);
+            }
         }
         dirtySinkHelper.open(parameters);
         Listener listener = createListener();
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
index 4e54e553e..2ff512704 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
 
+import java.util.HashSet;
 import java.util.UUID;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
@@ -36,6 +37,7 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.inlong.sort.elasticsearch.RequestIndexer;
@@ -74,9 +76,11 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
     // open and store an index generator for each new index.
     private Map<String, IndexGenerator> indexGeneratorMap;
     // table level metrics
-    private SinkMetricData sinkMetricData;
+    private SinkTableMetricData sinkMetricData;
     private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
     private transient SerializationSchema<RowData> serializationSchema;
+    // a hashset containing indices which are skipped due to exceptions.
+    private final HashSet<String> errorSet = new HashSet<>();
 
     public MultipleElasticsearchSinkFunctionBase(
             @Nullable String docType, // this is deprecated in es 7+
@@ -106,12 +110,12 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
     @Override
     public void open(RuntimeContext ctx, SinkMetricData sinkMetricData) {
         indexGeneratorMap = new HashMap<>();
-        this.sinkMetricData = sinkMetricData;
+        this.sinkMetricData = (SinkTableMetricData) sinkMetricData;
     }
 
-    private void sendMetrics(byte[] document) {
+    private void sendMetrics(byte[] document, String index) {
         if (sinkMetricData != null) {
-            sinkMetricData.invoke(1, document.length);
+            sinkMetricData.outputMetrics(index, 1, document.length);
         }
     }
 
@@ -125,7 +129,7 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
 
     @Override
     public void process(RowData element, RuntimeContext ctx, 
RequestIndexer<Request> indexer) {
-        JsonNode rootNode = null;
+        JsonNode rootNode;
         // parse rootnode
         try {
             jsonDynamicSchemaFormat =
@@ -155,10 +159,15 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
             document = serializationSchema.serialize(data);
         } catch (Exception e) {
             LOGGER.error(String.format("Serialize error, raw data: %s", data), 
e);
-            dirtySinkHelper.invoke(data, DirtyType.SERIALIZE_ERROR, e);
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(1, 
data.toString().getBytes(StandardCharsets.UTF_8).length);
-            }
+            handleDirty(data, DirtyType.SERIALIZE_ERROR, e, null);
+            return;
+        }
+        final String index;
+        try {
+            index = parseIndex(data, rootNode);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Generate index error, raw data: %s", 
data), e);
+            handleDirty(data, DirtyType.INDEX_GENERATE_ERROR, e, null);
             return;
         }
         final String key;
@@ -168,26 +177,44 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
             key = 
UUID.nameUUIDFromBytes(physicalData.toString().getBytes(StandardCharsets.UTF_8)).toString();
         } catch (Exception e) {
             LOGGER.error(String.format("Generate index id error, raw data: 
%s", data), e);
-            dirtySinkHelper.invoke(data, DirtyType.INDEX_ID_GENERATE_ERROR, e);
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(1, document.length);
-            }
+            handleDirty(data, DirtyType.INDEX_ID_GENERATE_ERROR, e, index);
             return;
         }
-        final String index;
-        try {
-            index = parseIndex(data, rootNode);
-        } catch (Exception e) {
-            LOGGER.error(String.format("Generate index error, raw data: %s", 
data), e);
-            dirtySinkHelper.invoke(data, DirtyType.INDEX_GENERATE_ERROR, e);
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(1, document.length);
-            }
+        // if the index is contained in errorset, then skip this record.
+        if (errorSet.contains(index)) {
             return;
         }
         addDocument(data, key, index, document, indexer);
     }
 
+    private void handleDirty(RowData rowData, DirtyType dirtyType, Exception 
e, String index) {
+        // skip the index in which the error has occurred
+        if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == 
schemaUpdateExceptionPolicy) {
+            if (index != null) {
+                errorSet.add(index);
+            } else {
+                return;
+            }
+        }
+
+        // keep retry the entire task until it succeeds
+        if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP == 
schemaUpdateExceptionPolicy) {
+            throw new RuntimeException(String.format("Writing records %s 
failed, restarting task",
+                    rowData), e);
+        }
+
+        // dirty data & archive
+        if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdateExceptionPolicy) {
+            dirtySinkHelper.invoke(rowData, dirtyType, e);
+            if (sinkMetricData != null && index != null) {
+                sinkMetricData.outputDirtyMetrics(index, 1,
+                        
rowData.toString().getBytes(StandardCharsets.UTF_8).length);
+            } else {
+                sinkMetricData.invokeDirty(1, 
rowData.toString().getBytes(StandardCharsets.UTF_8).length);
+            }
+        }
+    }
+
     private String parseIndex(RowData rowData, JsonNode rootNode)
             throws Exception {
         String index;
@@ -219,13 +246,13 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
                     request = requestFactory.createUpdateRequest(index, 
docType, key, contentType, document);
                     if (addRouting(request, element, document)) {
                         indexer.add(request);
-                        sendMetrics(document);
+                        sendMetrics(document, index);
                     }
                 } else {
                     request = requestFactory.createIndexRequest(index, 
docType, key, contentType, document);
                     if (addRouting(request, element, document)) {
                         indexer.add(request);
-                        sendMetrics(document);
+                        sendMetrics(document, index);
                     }
                 }
                 break;
@@ -233,19 +260,16 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
                 request = requestFactory.createDeleteRequest(index, docType, 
key);
                 if (addRouting(request, element, document)) {
                     indexer.add(request);
-                    sendMetrics(document);
+                    sendMetrics(document, index);
                 }
                 break;
             case UPDATE_BEFORE:
-                sendMetrics(document);
+                sendMetrics(document, index);
                 break;
             default:
                 LOGGER.error(String.format("The type of element should be 
'RowData' only, raw data: %s", element));
-                dirtySinkHelper.invoke(element, 
DirtyType.UNSUPPORTED_DATA_TYPE,
-                        new RuntimeException("The type of element should be 
'RowData' only."));
-                if (sinkMetricData != null) {
-                    sinkMetricData.invokeDirty(1, document.length);
-                }
+                handleDirty(element, DirtyType.UNSUPPORTED_DATA_TYPE,
+                        new RuntimeException("The type of element should be 
'RowData' only."), index);
         }
     }
 
@@ -256,10 +280,7 @@ public abstract class 
MultipleElasticsearchSinkFunctionBase<Request, ContentType
                 handleRouting(request, routing);
             } catch (Exception e) {
                 LOGGER.error(String.format("Routing error, raw data: %s", 
row), e);
-                dirtySinkHelper.invoke(row, DirtyType.INDEX_ROUTING_ERROR, e);
-                if (sinkMetricData != null) {
-                    sinkMetricData.invokeDirty(1, document.length);
-                }
+                handleDirty(row, DirtyType.INDEX_ROUTING_ERROR, e, null);
                 return false;
             }
         }

Reply via email to