This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5b29cc0d57 [INLONG-10357][Sort] Make StarRocks sink support report
audit information exactly once (#10549)
5b29cc0d57 is described below
commit 5b29cc0d57d9cf83a7acc079d2579317d4ff7f6b
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 2 18:17:36 2024 +0800
[INLONG-10357][Sort] Make StarRocks sink support report audit information
exactly once (#10549)
---
.../sink/table/StarRocksDynamicSinkFunctionV2.java | 49 +++++++++++++++-------
.../starrocks/table/sink/utils/SchemaUtils.java | 6 +--
2 files changed, 36 insertions(+), 19 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 9df5f0e422..b4e915eec9 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.starrocks.table.sink.table;
import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.SinkExactlyMetric;
import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils;
import com.google.common.base.Strings;
@@ -70,6 +70,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
/**
* StarRocks dynamic sink function. It supports insert, upsert, delete in
Starrocks.
* @param <T>
@@ -87,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
private transient volatile ListState<StarrocksSnapshotState>
snapshotStates;
private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new
ConcurrentHashMap<>();
- private transient SinkMetricData sinkMetricData;
+ private transient SinkExactlyMetric sinkExactlyMetric;
@Deprecated
private transient ListState<Map<String, StarRocksSinkBufferEntity>>
legacyState;
@@ -207,19 +209,18 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
Object[] data = rowTransformer.transform(value,
sinkOptions.supportUpsertDelete());
- ouputMetrics(value, data);
-
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));
+ ouputMetrics(value, data);
}
private void ouputMetrics(T value, Object[] data) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeWithEstimate(value,
schemaUtils.getDataTime(data));
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.invoke(1, getDataSize(value),
schemaUtils.getDataTime(data));
}
}
@@ -237,10 +238,10 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
.build();
if (metricOption != null) {
- sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ sinkExactlyMetric = new SinkExactlyMetric(metricOption,
getRuntimeContext().getMetricGroup());
}
- notifyCheckpointComplete(Long.MAX_VALUE);
+ commitTransaction(Long.MAX_VALUE);
log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
}
@@ -265,10 +266,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
+ updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
sinkManager.flush();
- flushAudit();
-
if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
return;
}
@@ -290,12 +290,6 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
}
}
- private void flushAudit() {
- if (sinkMetricData != null) {
- sinkMetricData.flushAuditData();
- }
- }
-
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
@@ -346,7 +340,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
@Override
public void notifyCheckpointComplete(long checkpointId) {
+ commitTransaction(checkpointId);
+ flushAudit();
+ updateLastCheckpointId(checkpointId);
+ }
+ private void commitTransaction(long checkpointId) {
boolean succeed = true;
List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
@@ -395,4 +394,22 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
legacyData.clear();
}
+ private void flushAudit() {
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.flushAudit();
+ }
+ }
+
+ private void updateCurrentCheckpointId(long checkpointId) {
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ private void updateLastCheckpointId(long checkpointId) {
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
index 76e91e6cf3..a2adb72091 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable {
private static final long serialVersionUID = 1L;
- private final String AUDIT_DATA_TIME = "audit_data_time";
+ private final String AUDIT_DATA_TIME = "AUDIT_DATA_TIME";
private final int DATA_TIME_ABSENT_INDEX = -1;
private final int dataTimeFieldIndex;
@@ -73,7 +73,7 @@ public class SchemaUtils implements Serializable {
*/
public String[] filterOutTimeField(TableSchema schema) {
return Arrays.stream(schema.getFieldNames())
- .filter(field -> !AUDIT_DATA_TIME.equals(field))
+ .filter(field -> !AUDIT_DATA_TIME.equalsIgnoreCase(field))
.toArray(String[]::new);
}
@@ -84,7 +84,7 @@ public class SchemaUtils implements Serializable {
*/
private int getDataTimeIndex(String[] fieldNames) {
for (int i = 0; i < fieldNames.length; i++) {
- if (AUDIT_DATA_TIME.equals(fieldNames[i])) {
+ if (AUDIT_DATA_TIME.equalsIgnoreCase(fieldNames[i])) {
return i;
}
}