This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f718ef32a [INLONG-6657][Sort] Add dirty data metric for Hive (#6658)
f718ef32a is described below

commit f718ef32a2c32cf1b0aef6a55ab7644360a2bdc7
Author: haifxu <[email protected]>
AuthorDate: Tue Dec 6 10:44:28 2022 +0800

    [INLONG-6657][Sort] Add dirty data metric for Hive (#6658)
---
 .../inlong/sort/base/metric/SinkMetricData.java    |  5 ++++
 .../hive/filesystem/AbstractStreamingWriter.java   | 34 ++++++++++++++++------
 2 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index e5957be4c..9cd78c498 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -258,6 +258,11 @@ public class SinkMetricData implements MetricData {
         invoke(1, size);
     }
 
+    public void invokeDirtyWithEstimate(Object o) {
+        long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
+        invokeDirty(1, size);
+    }
+
     public void invoke(long rowCount, long rowSize) {
         if (numRecordsOut != null) {
             numRecordsOut.inc(rowCount);
diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 1c0574661..f032f3ab1 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -44,6 +44,8 @@ import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -96,7 +98,9 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
-    /** Notifies a partition created. */
+    /**
+     * Notifies a partition created.
+     */
     protected abstract void partitionCreated(String partition);
 
     /**
@@ -113,7 +117,9 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
      */
     protected abstract void onPartFileOpened(String partition, Path newPath);
 
-    /** Commit up to this checkpoint id. */
+    /**
+     * Commit up to this checkpoint id.
+     */
     protected void commitUpToCheckpoint(long checkpointId) throws Exception {
         helper.commitUpToCheckpoint(checkpointId);
     }
@@ -126,6 +132,8 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -196,13 +204,21 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
-        helper.onElement(
-                element.getValue(),
-                getProcessingTimeService().getCurrentProcessingTime(),
-                element.hasTimestamp() ? element.getTimestamp() : null,
-                currentWatermark);
-        if (metricData != null) {
-            metricData.invokeWithEstimate(element.getValue());
+        try {
+            helper.onElement(
+                    element.getValue(),
+                    getProcessingTimeService().getCurrentProcessingTime(),
+                    element.hasTimestamp() ? element.getTimestamp() : null,
+                    currentWatermark);
+            if (metricData != null) {
+                metricData.invokeWithEstimate(element.getValue());
+            }
+        } catch (Exception e) {
+            if (metricData != null) {
+                metricData.invokeDirtyWithEstimate(element.getValue());
+            }
+            // TODO: to support dirty data side-output
+            throw new RuntimeException(e);
         }
     }
 

Reply via email to