This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab2a4d21e [INLONG-7417][Sort] Use SinkTableMetricData instead of
SinkMetricData in IcebergSingleStreamWriter (#7418)
ab2a4d21e is described below
commit ab2a4d21ece934597355ff7b3c1217d983938d56
Author: LinChen <[email protected]>
AuthorDate: Sat Feb 25 15:46:38 2023 +0800
[INLONG-7417][Sort] Use SinkTableMetricData instead of SinkMetricData in
IcebergSingleStreamWriter (#7418)
---
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +-
.../sink/multiple/IcebergMultipleStreamWriter.java | 106 ++++++++++++++++++++-
.../sink/multiple/IcebergSingleStreamWriter.java | 44 ++++++---
3 files changed, 135 insertions(+), 18 deletions(-)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index e816bc491..0fd079f59 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -716,7 +716,8 @@ public class FlinkSink {
appendMode);
return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
- table.name(), taskWriterFactory, inlongMetric,
auditHostAndPorts, null, dirtyOptions, dirtySink));
+ table.name(), taskWriterFactory, inlongMetric,
auditHostAndPorts,
+ null, dirtyOptions, dirtySink, false));
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 3022ec6c0..db784c14b 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -17,6 +17,11 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -35,15 +40,23 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -58,6 +71,11 @@ import static
org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
/**
* Iceberg writer that can distinguish different sink tables and route and
distribute data into different
@@ -86,6 +104,11 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private transient SinkTableMetricData sinkMetricData;
+ private transient MetricState metricState;
+ private transient ListState<MetricState> metricStateListState;
+ private transient RuntimeContext runtimeContext;
+
public IcebergMultipleStreamWriter(
boolean appendMode,
CatalogLoader catalogLoader,
@@ -109,6 +132,21 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
this.multipleWriters = new HashMap<>();
this.multipleTables = new HashMap<>();
this.multipleSchemas = new HashMap<>();
+
+ this.runtimeContext = getRuntimeContext();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkTableMetricData(metricOption,
runtimeContext.getMetricGroup());
+ sinkMetricData.registerSubMetricsGroup(metricState);
+ }
}
@Override
@@ -183,7 +221,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
.append(Constants.TABLE_NAME).append("=").append(tableId.name());
IcebergSingleStreamWriter<RowData> writer = new
IcebergSingleStreamWriter<>(
tableId.toString(), taskWriterFactory,
subWriterInlongMetric.toString(),
- auditHostAndPorts, flinkRowType, dirtyOptions,
dirtySink);
+ auditHostAndPorts, flinkRowType, dirtyOptions,
dirtySink, true);
writer.setup(getRuntimeContext(),
new CallbackCollector<>(
writeResult -> collector.collect(new
MultipleWriteResult(tableId, writeResult))),
@@ -201,7 +239,55 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
if (multipleWriters.get(tableId) != null) {
for (RowData data : recordWithSchema.getData()) {
- multipleWriters.get(tableId).processElement(data);
+ String dataBaseName = tableId.namespace().toString();
+ String tableName = tableId.name();
+ long size = data == null ? 0 :
data.toString().getBytes(StandardCharsets.UTF_8).length;
+
+ try {
+ multipleWriters.get(tableId).processElement(data);
+ } catch (Exception e) {
+ LOG.error(String.format("write error, raw data: %s",
data), e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw e;
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder =
DirtyData.builder();
+ try {
+ String dirtyLabel =
DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
+ DirtyType.BATCH_LOAD_ERROR, null,
+ dataBaseName, tableName, null);
+ String dirtyLogTag =
+
DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
+ DirtyType.BATCH_LOAD_ERROR, null,
+ dataBaseName, tableName, null);
+ String dirtyIdentifier =
+
DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
+ DirtyType.BATCH_LOAD_ERROR, null,
+ dataBaseName, tableName, null);
+ builder.setData(data)
+ .setLabels(dirtyLabel)
+ .setLogTag(dirtyLogTag)
+ .setIdentifier(dirtyIdentifier)
+
.setRowType(multipleWriters.get(tableId).getFlinkRowType())
+ .setDirtyMessage(e.getMessage());
+ dirtySink.invoke(builder.build());
+ if (sinkMetricData != null) {
+
sinkMetricData.outputDirtyMetricsWithEstimate(dataBaseName,
+ tableName, 1, size);
+ }
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ return;
+ }
+
+ if (sinkMetricData != null) {
+ sinkMetricData.outputMetrics(dataBaseName, tableName, 1,
size);
+ }
}
} else {
LOG.error("Unregistered table schema for {}.",
recordWithSchema.getTableId());
@@ -217,14 +303,26 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry
: multipleWriters.entrySet()) {
- entry.getValue().snapshotState(context);
+ if (sinkMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState,
sinkMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
this.functionInitializationContext = context;
+ if (this.inlongMetric != null) {
+ this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new
TypeHint<MetricState>() {
+ })));
+
+ }
+ if (context.isRestored()) {
+ metricState =
MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
}
private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index dc30c5b21..a2117daf1 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -74,6 +74,8 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private boolean multipleSink;
+
public IcebergSingleStreamWriter(
String fullTableName,
TaskWriterFactory<T> taskWriterFactory,
@@ -81,7 +83,8 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
String auditHostAndPorts,
@Nullable RowType flinkRowType,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean multipleSink) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
this.inlongMetric = inlongMetric;
@@ -89,6 +92,11 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
this.flinkRowType = flinkRowType;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.multipleSink = multipleSink;
+ }
+
+ public RowType getFlinkRowType() {
+ return flinkRowType;
}
@Override
@@ -102,17 +110,19 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
this.writer = taskWriterFactory.create();
// Initialize metric
- MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
- .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
- .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
- .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
- .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
- .withRegisterMetric(RegisteredMetric.ALL)
- .build();
- if (metricOption != null) {
- metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ if (!multipleSink) {
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
}
}
@@ -128,6 +138,10 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
try {
writer.write(value);
} catch (Exception e) {
+ if (multipleSink) {
+ throw e;
+ }
+
LOGGER.error(String.format("write error, raw data: %s", value), e);
if (!dirtyOptions.ignoreDirty()) {
throw e;
@@ -152,6 +166,7 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
LOGGER.warn("Dirty sink failed", ex);
}
}
+ return;
}
if (metricData != null) {
metricData.invokeWithEstimate(value == null ? "" : value);
@@ -161,10 +176,13 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
// init metric state
+ if (multipleSink) {
+ return;
+ }
if (this.inlongMetric != null) {
this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>(
- String.format("Iceberg(%s)-" +
INLONG_METRIC_STATE_NAME, fullTableName),
+ String.format(INLONG_METRIC_STATE_NAME,
fullTableName),
TypeInformation.of(new TypeHint<MetricState>() {
})));
}