This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f4093967ae [INLONG-8969][Sort] Optimize Iceberg Source audit report
logic (#8970)
f4093967ae is described below
commit f4093967ae7d21ab6c0eba79bf5648ea38fbaff9
Author: vernedeng <[email protected]>
AuthorDate: Fri Sep 22 16:21:14 2023 +0800
[INLONG-8969][Sort] Optimize Iceberg Source audit report logic (#8970)
* [INLONG-8969][Sort] Optimize Iceberg Source audit report logic
---
.../inlong/sort/base/metric/SourceMetricData.java | 6 -----
.../sort/base/util/CalculateObjectSizeUtils.java | 7 -----
.../inlong/sort/iceberg/source/IcebergSource.java | 4 +--
.../iceberg/source/reader/IcebergSourceReader.java | 2 +-
.../source/reader/IcebergSourceSplitReader.java | 4 +--
.../reader/InlongIcebergSourceReaderMetrics.java | 31 +++++++++++++++++++---
.../source/reader/RowDataRecordFactory.java | 2 +-
.../source/utils/RecyclableJoinedRowData.java | 9 ++++++-
8 files changed, 42 insertions(+), 23 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index c77f688045..ec64f53b39 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -38,7 +38,6 @@ import static
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
-import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize;
import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
/**
@@ -243,11 +242,6 @@ public class SourceMetricData implements MetricData {
outputMetrics(1, getDataSize(data));
}
- public void outputMetricsWithEstimate(Object[] records) {
- long size = getDataArraySize(records);
- outputMetrics(records.length, size);
- }
-
public void outputMetricsWithEstimate(Object data, long fetchDelay, long
emitDelay) {
outputMetrics(1, getDataSize(data));
this.fetchDelay = fetchDelay;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
index 748522693b..0826eb2a9e 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.base.util;
import org.apache.flink.table.data.binary.BinaryRowData;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
/**
* calculate tool for object
@@ -45,10 +44,4 @@ public class CalculateObjectSizeUtils {
return size;
}
- public static long getDataArraySize(Object[] objects) {
- return Arrays.stream(objects)
- .mapToLong(CalculateObjectSizeUtils::getDataSize)
- .sum();
- }
-
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
index d5b58e050c..adff22e7cd 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -164,8 +164,8 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
@Override
public SourceReader<T, IcebergSourceSplit>
createReader(SourceReaderContext readerContext) {
- InlongIcebergSourceReaderMetrics metrics =
- new
InlongIcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
+ InlongIcebergSourceReaderMetrics<T> metrics =
+ new
InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(),
lazyTable().name());
metrics.registerMetrics(metricOption);
return new IcebergSourceReader<>(metrics, readerFunction,
readerContext);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index beaf321e51..ad3a9b13d4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -39,7 +39,7 @@ public class IcebergSourceReader<T>
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T,
IcebergSourceSplit, IcebergSourceSplit> {
public IcebergSourceReader(
- InlongIcebergSourceReaderMetrics metrics,
+ InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
SourceReaderContext context) {
super(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
index 146c2dad7e..002ac947a4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
@@ -44,7 +44,7 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
private static final Logger LOG =
LoggerFactory.getLogger(IcebergSourceSplitReader.class);
- private final InlongIcebergSourceReaderMetrics metrics;
+ private final InlongIcebergSourceReaderMetrics<T> metrics;
private final ReaderFunction<T> openSplitFunction;
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
@@ -54,7 +54,7 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
private String currentSplitId;
IcebergSourceSplitReader(
- InlongIcebergSourceReaderMetrics metrics,
+ InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> openSplitFunction,
SourceReaderContext context) {
this.metrics = metrics;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 60cca86ec3..a2431d3fbd 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -19,16 +19,20 @@ package org.apache.inlong.sort.iceberg.source.reader;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
+import java.nio.charset.StandardCharsets;
+
/**
* Inlong iceberg source reader metrics
*/
@Slf4j
-public class InlongIcebergSourceReaderMetrics extends
IcebergSourceReaderMetrics {
+public class InlongIcebergSourceReaderMetrics<T> extends
IcebergSourceReaderMetrics {
private final MetricGroup metrics;
private SourceMetricData sourceMetricData;
@@ -46,10 +50,31 @@ public class InlongIcebergSourceReaderMetrics extends
IcebergSourceReaderMetrics
}
}
- public void outputMetricsWithEstimate(ArrayBatchRecords batchRecord) {
+ public void outputMetricsWithEstimate(ArrayBatchRecords<T> batchRecord) {
if (sourceMetricData != null) {
- sourceMetricData.outputMetricsWithEstimate(batchRecord.records());
+ int dataCount = batchRecord.numberOfRecords();
+ T[] records = batchRecord.records();
+ for (int i = 0; i < dataCount; i++) {
+ long dataSize = getDataSize(records[i]);
+ long dataTime = getDataTime(records[i]);
+ sourceMetricData.outputMetrics(1, dataSize, dataTime);
+ }
+
}
+ }
+ private long getDataTime(T object) {
+ if (object instanceof RecyclableJoinedRowData) {
+ return ((RecyclableJoinedRowData) object).getDataTime();
+ }
+ return System.currentTimeMillis();
+ }
+
+ private long getDataSize(T object) {
+ if (object instanceof RecyclableJoinedRowData) {
+ RowData physical = ((RecyclableJoinedRowData)
object).getPhysicalRowData();
+ return physical.toString().getBytes(StandardCharsets.UTF_8).length;
+ }
+ return object.toString().getBytes(StandardCharsets.UTF_8).length;
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
index 80110cf014..f17186449a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
@@ -69,6 +69,6 @@ class RowDataRecordFactory implements RecordFactory<RowData> {
RowData physical =
RowDataCloneUtil.clonePhysical(from,
recyclable.getPhysicalRowData(), rowType, fieldSerializers);
RowData meta = RowDataCloneUtil.cloneMeta(from,
recyclable.getMetaRowData(), metadataConverters);
- batch[position] = recyclable.replace(physical.getRowKind(), physical,
meta);
+ batch[position] = recyclable.replace(physical.getRowKind(), physical,
meta, System.currentTimeMillis());
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
index 28327d2d8d..4dcec331d4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
@@ -39,6 +39,7 @@ public class RecyclableJoinedRowData implements RowData {
private RowKind rowKind = RowKind.INSERT;
private RowData physicalRowData;
private RowData metaRowData;
+ private long dataTime;
public RecyclableJoinedRowData() {
}
@@ -48,10 +49,12 @@ public class RecyclableJoinedRowData implements RowData {
metaRowData = new GenericRowData(metaSize);
}
- public RecyclableJoinedRowData replace(RowKind rowKind, RowData
physicalRowData, RowData metaRowData) {
+ public RecyclableJoinedRowData replace(RowKind rowKind, RowData
physicalRowData, RowData metaRowData,
+ long dataTime) {
this.rowKind = rowKind;
this.physicalRowData = physicalRowData;
this.metaRowData = metaRowData;
+ this.dataTime = dataTime;
return this;
}
@@ -63,6 +66,10 @@ public class RecyclableJoinedRowData implements RowData {
return metaRowData;
}
+ public long getDataTime() {
+ return dataTime;
+ }
+
//
---------------------------------------------------------------------------------------------
@Override