This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b7c6f49d7 [INLONG-6575][Sort] Add dirty data metric for Filesystem
(#6726)
b7c6f49d7 is described below
commit b7c6f49d7fd1cd6c86419f991bb74426ea04252f
Author: Xin Gong <[email protected]>
AuthorDate: Tue Dec 6 11:51:31 2022 +0800
[INLONG-6575][Sort] Add dirty data metric for Filesystem (#6726)
---
.../filesystem/stream/AbstractStreamingWriter.java | 46 +++++++++++++++++-----
1 file changed, 36 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 568b639f1..e5e74e62a 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -42,6 +42,10 @@ import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -78,7 +82,8 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
private transient long currentWatermark;
- private SinkMetricData metricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
public AbstractStreamingWriter(
long bucketCheckInterval,
@@ -118,6 +123,8 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
.withInlongAudit(inlongAudit)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -129,7 +136,20 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
* Commit up to this checkpoint id.
*/
protected void commitUpToCheckpoint(long checkpointId) throws Exception {
- helper.commitUpToCheckpoint(checkpointId);
+ try {
+ helper.commitUpToCheckpoint(checkpointId);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize, dataSize);
+ }
+ } catch (Exception e) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ LOG.error("fileSystem sink commitUpToCheckpoint.", e);
+ } finally {
+ rowSize = 0L;
+ dataSize = 0L;
+ }
}
@Override
@@ -192,14 +212,20 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
}
@Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- helper.onElement(
- element.getValue(),
- getProcessingTimeService().getCurrentProcessingTime(),
- element.hasTimestamp() ? element.getTimestamp() : null,
- currentWatermark);
- if (metricData != null) {
- metricData.invokeWithEstimate(element.getValue());
+ public void processElement(StreamRecord<IN> element) {
+ try {
+ helper.onElement(
+ element.getValue(),
+ getProcessingTimeService().getCurrentProcessingTime(),
+ element.hasTimestamp() ? element.getTimestamp() : null,
+ currentWatermark);
+ rowSize = rowSize + 1;
+ dataSize = dataSize +
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+ } catch (Exception e) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1L,
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ LOG.error("fileSystem sink processElement.", e);
}
}