This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f718ef32a [INLONG-6657][Sort] Add dirty data metric for Hive (#6658)
f718ef32a is described below
commit f718ef32a2c32cf1b0aef6a55ab7644360a2bdc7
Author: haifxu <[email protected]>
AuthorDate: Tue Dec 6 10:44:28 2022 +0800
[INLONG-6657][Sort] Add dirty data metric for Hive (#6658)
---
.../inlong/sort/base/metric/SinkMetricData.java | 5 ++++
.../hive/filesystem/AbstractStreamingWriter.java | 34 ++++++++++++++++------
2 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index e5957be4c..9cd78c498 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -258,6 +258,11 @@ public class SinkMetricData implements MetricData {
invoke(1, size);
}
+ public void invokeDirtyWithEstimate(Object o) {
+ long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
+ invokeDirty(1, size);
+ }
+
public void invoke(long rowCount, long rowSize) {
if (numRecordsOut != null) {
numRecordsOut.inc(rowCount);
diff --git
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 1c0574661..f032f3ab1 100644
---
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -44,6 +44,8 @@ import org.apache.inlong.sort.base.util.MetricStateUtils;
import javax.annotation.Nullable;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -96,7 +98,9 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
setChainingStrategy(ChainingStrategy.ALWAYS);
}
- /** Notifies a partition created. */
+ /**
+ * Notifies a partition created.
+ */
protected abstract void partitionCreated(String partition);
/**
@@ -113,7 +117,9 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
*/
protected abstract void onPartFileOpened(String partition, Path newPath);
- /** Commit up to this checkpoint id. */
+ /**
+ * Commit up to this checkpoint id.
+ */
protected void commitUpToCheckpoint(long checkpointId) throws Exception {
helper.commitUpToCheckpoint(checkpointId);
}
@@ -126,6 +132,8 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
.withInlongAudit(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -196,13 +204,21 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
- helper.onElement(
- element.getValue(),
- getProcessingTimeService().getCurrentProcessingTime(),
- element.hasTimestamp() ? element.getTimestamp() : null,
- currentWatermark);
- if (metricData != null) {
- metricData.invokeWithEstimate(element.getValue());
+ try {
+ helper.onElement(
+ element.getValue(),
+ getProcessingTimeService().getCurrentProcessingTime(),
+ element.hasTimestamp() ? element.getTimestamp() : null,
+ currentWatermark);
+ if (metricData != null) {
+ metricData.invokeWithEstimate(element.getValue());
+ }
+ } catch (Exception e) {
+ if (metricData != null) {
+ metricData.invokeDirtyWithEstimate(element.getValue());
+ }
+ // TODO: to support dirty data side-output
+ throw new RuntimeException(e);
}
}