This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab2a4d21e [INLONG-7417][Sort] Use SinkTableMetricData instead of 
SinkMetricData in IcebergSingleStreamWriter (#7418)
ab2a4d21e is described below

commit ab2a4d21ece934597355ff7b3c1217d983938d56
Author: LinChen <[email protected]>
AuthorDate: Sat Feb 25 15:46:38 2023 +0800

    [INLONG-7417][Sort] Use SinkTableMetricData instead of SinkMetricData in 
IcebergSingleStreamWriter (#7418)
---
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |   3 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java | 106 ++++++++++++++++++++-
 .../sink/multiple/IcebergSingleStreamWriter.java   |  44 ++++++---
 3 files changed, 135 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index e816bc491..0fd079f59 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -716,7 +716,8 @@ public class FlinkSink {
                         appendMode);
 
         return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
-                table.name(), taskWriterFactory, inlongMetric, 
auditHostAndPorts, null, dirtyOptions, dirtySink));
+                table.name(), taskWriterFactory, inlongMetric, 
auditHostAndPorts,
+                null, dirtyOptions, dirtySink, false));
     }
 
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 3022ec6c0..db784c14b 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -35,15 +40,23 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -58,6 +71,11 @@ import static 
org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Iceberg writer that can distinguish different sink tables and route and 
distribute data into different
@@ -86,6 +104,11 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
+    private transient SinkTableMetricData sinkMetricData;
+    private transient MetricState metricState;
+    private transient ListState<MetricState> metricStateListState;
+    private transient RuntimeContext runtimeContext;
+
     public IcebergMultipleStreamWriter(
             boolean appendMode,
             CatalogLoader catalogLoader,
@@ -109,6 +132,21 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
         this.multipleWriters = new HashMap<>();
         this.multipleTables = new HashMap<>();
         this.multipleSchemas = new HashMap<>();
+
+        this.runtimeContext = getRuntimeContext();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkTableMetricData(metricOption, 
runtimeContext.getMetricGroup());
+            sinkMetricData.registerSubMetricsGroup(metricState);
+        }
     }
 
     @Override
@@ -183,7 +221,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
                         
.append(Constants.TABLE_NAME).append("=").append(tableId.name());
                 IcebergSingleStreamWriter<RowData> writer = new 
IcebergSingleStreamWriter<>(
                         tableId.toString(), taskWriterFactory, 
subWriterInlongMetric.toString(),
-                        auditHostAndPorts, flinkRowType, dirtyOptions, 
dirtySink);
+                        auditHostAndPorts, flinkRowType, dirtyOptions, 
dirtySink, true);
                 writer.setup(getRuntimeContext(),
                         new CallbackCollector<>(
                                 writeResult -> collector.collect(new 
MultipleWriteResult(tableId, writeResult))),
@@ -201,7 +239,55 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
 
         if (multipleWriters.get(tableId) != null) {
             for (RowData data : recordWithSchema.getData()) {
-                multipleWriters.get(tableId).processElement(data);
+                String dataBaseName = tableId.namespace().toString();
+                String tableName = tableId.name();
+                long size = data == null ? 0 : 
data.toString().getBytes(StandardCharsets.UTF_8).length;
+
+                try {
+                    multipleWriters.get(tableId).processElement(data);
+                } catch (Exception e) {
+                    LOG.error(String.format("write error, raw data: %s", 
data), e);
+                    if (!dirtyOptions.ignoreDirty()) {
+                        throw e;
+                    }
+                    if (dirtySink != null) {
+                        DirtyData.Builder<Object> builder = 
DirtyData.builder();
+                        try {
+                            String dirtyLabel = 
DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
+                                    DirtyType.BATCH_LOAD_ERROR, null,
+                                    dataBaseName, tableName, null);
+                            String dirtyLogTag =
+                                    
DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
+                                            DirtyType.BATCH_LOAD_ERROR, null,
+                                            dataBaseName, tableName, null);
+                            String dirtyIdentifier =
+                                    
DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
+                                            DirtyType.BATCH_LOAD_ERROR, null,
+                                            dataBaseName, tableName, null);
+                            builder.setData(data)
+                                    .setLabels(dirtyLabel)
+                                    .setLogTag(dirtyLogTag)
+                                    .setIdentifier(dirtyIdentifier)
+                                    
.setRowType(multipleWriters.get(tableId).getFlinkRowType())
+                                    .setDirtyMessage(e.getMessage());
+                            dirtySink.invoke(builder.build());
+                            if (sinkMetricData != null) {
+                                
sinkMetricData.outputDirtyMetricsWithEstimate(dataBaseName,
+                                        tableName, 1, size);
+                            }
+                        } catch (Exception ex) {
+                            if (!dirtyOptions.ignoreSideOutputErrors()) {
+                                throw new RuntimeException(ex);
+                            }
+                            LOG.warn("Dirty sink failed", ex);
+                        }
+                    }
+                    return;
+                }
+
+                if (sinkMetricData != null) {
+                    sinkMetricData.outputMetrics(dataBaseName, tableName, 1, 
size);
+                }
             }
         } else {
             LOG.error("Unregistered table schema for {}.", 
recordWithSchema.getTableId());
@@ -217,14 +303,26 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry 
: multipleWriters.entrySet()) {
-            entry.getValue().snapshotState(context);
+        if (sinkMetricData != null && metricStateListState != null) {
+            
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
sinkMetricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
         }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
         this.functionInitializationContext = context;
+        if (this.inlongMetric != null) {
+            this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint<MetricState>() {
+                            })));
+
+        }
+        if (context.isRestored()) {
+            metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+        }
     }
 
     private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index dc30c5b21..a2117daf1 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -74,6 +74,8 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
+    private boolean multipleSink;
+
     public IcebergSingleStreamWriter(
             String fullTableName,
             TaskWriterFactory<T> taskWriterFactory,
@@ -81,7 +83,8 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
             String auditHostAndPorts,
             @Nullable RowType flinkRowType,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean multipleSink) {
         this.fullTableName = fullTableName;
         this.taskWriterFactory = taskWriterFactory;
         this.inlongMetric = inlongMetric;
@@ -89,6 +92,11 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
         this.flinkRowType = flinkRowType;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.multipleSink = multipleSink;
+    }
+
+    public RowType getFlinkRowType() {
+        return flinkRowType;
     }
 
     @Override
@@ -102,17 +110,19 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
         this.writer = taskWriterFactory.create();
 
         // Initialize metric
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
-                .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
-                .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
-                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
-                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        if (!multipleSink) {
+            MetricOption metricOption = MetricOption.builder()
+                    .withInlongLabels(inlongMetric)
+                    .withInlongAudit(auditHostAndPorts)
+                    .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                    .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                    .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                    .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                    .withRegisterMetric(RegisteredMetric.ALL)
+                    .build();
+            if (metricOption != null) {
+                metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            }
         }
     }
 
@@ -128,6 +138,10 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
         try {
             writer.write(value);
         } catch (Exception e) {
+            if (multipleSink) {
+                throw e;
+            }
+
             LOGGER.error(String.format("write error, raw data: %s", value), e);
             if (!dirtyOptions.ignoreDirty()) {
                 throw e;
@@ -152,6 +166,7 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
                     LOGGER.warn("Dirty sink failed", ex);
                 }
             }
+            return;
         }
         if (metricData != null) {
             metricData.invokeWithEstimate(value == null ? "" : value);
@@ -161,10 +176,13 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
         // init metric state
+        if (multipleSink) {
+            return;
+        }
         if (this.inlongMetric != null) {
             this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
                     new ListStateDescriptor<>(
-                            String.format("Iceberg(%s)-" + 
INLONG_METRIC_STATE_NAME, fullTableName),
+                            String.format(INLONG_METRIC_STATE_NAME, 
fullTableName),
                             TypeInformation.of(new TypeHint<MetricState>() {
                             })));
         }

Reply via email to