This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e0460acea [INLONG-7539][Sort] StarRocks connector parses row kind from
GenericRowData (#7541)
e0460acea is described below
commit e0460acea10d10f376b4f3e91319b0bb9800abea
Author: Liao Rui <[email protected]>
AuthorDate: Wed Mar 8 15:03:12 2023 +0800
[INLONG-7539][Sort] StarRocks connector parses row kind from GenericRowData
(#7541)
Co-authored-by: ryanrliao <[email protected]>
---
.../starrocks/manager/StarRocksSinkManager.java | 24 +++++++-----
.../table/sink/StarRocksDynamicSinkFunction.java | 45 +++++++++++++++-------
2 files changed, 45 insertions(+), 24 deletions(-)
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index c69d28f8a..f55e4e22e 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -522,6 +522,20 @@ public class StarRocksSinkManager implements Serializable {
}
private void handleDirtyData(SinkBufferEntity flushData, Exception e)
throws JsonProcessingException {
+ // upload metrics for dirty data
+ if (null != metricData) {
+ if (multipleSink) {
+ metricData.outputDirtyMetrics(flushData.getDatabase(),
flushData.getTable(),
+ flushData.getBatchCount(), flushData.getBatchSize());
+ } else {
+ metricData.invokeDirty(flushData.getBatchCount(),
flushData.getBatchSize());
+ }
+ }
+
+ if (!dirtySinkHelper.getDirtyOptions().ignoreDirty()) {
+ return;
+ }
+
// archive dirty data
if
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
{
String columnSeparator = StarRocksDelimiterParser.parse(
@@ -554,16 +568,6 @@ public class StarRocksSinkManager implements Serializable {
e);
}
}
-
- // upload metrics for dirty data
- if (null != metricData) {
- if (multipleSink) {
- metricData.outputDirtyMetrics(flushData.getDatabase(),
flushData.getTable(),
- flushData.getBatchCount(), flushData.getBatchSize());
- } else {
- metricData.invokeDirty(flushData.getBatchCount(),
flushData.getBatchSize());
- }
- }
}
private void waitAsyncFlushingDone() throws InterruptedException {
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 878912d6f..3b70c35bb 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -279,25 +279,42 @@ public class StarRocksDynamicSinkFunction<T> extends
RichSinkFunction<T> impleme
LOG.warn("Parse dirty options failed. {}",
ExceptionUtils.stringifyException(e));
}
- RowKind rowKind = rowData.getRowKind();
+ List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
+ jsonDynamicSchemaFormat.getOpType(rootNode));
List<Map<String, String>> physicalDataList =
jsonDynamicSchemaFormat.jsonNode2Map(
jsonDynamicSchemaFormat.getPhysicalData(rootNode));
+ JsonNode updateBeforeNode =
jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+ List<Map<String, String>> updateBeforeList = null;
+ if (updateBeforeNode != null) {
+ updateBeforeList =
jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
+ }
List<Map<String, String>> records = new ArrayList<>();
for (int i = 0; i < physicalDataList.size(); i++) {
- Map<String, String> record = physicalDataList.get(i);
- switch (rowKind) {
- case INSERT:
- case UPDATE_AFTER:
- record.put("__op",
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
- break;
- case DELETE:
- case UPDATE_BEFORE:
- record.put("__op",
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
- break;
- default:
- throw new RuntimeException("Unrecognized row kind:" +
rowKind);
+ for (RowKind rowKind : rowKinds) {
+ Map<String, String> record = null;
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ record = physicalDataList.get(i);
+ record.put("__op",
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
+ break;
+ case DELETE:
+ record = physicalDataList.get(i);
+ record.put("__op",
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+ break;
+ case UPDATE_BEFORE:
+ if (updateBeforeList != null &&
updateBeforeList.size() > i) {
+ record = updateBeforeList.get(i);
+ record.put("__op",
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+ }
+ break;
+ default:
+ throw new RuntimeException("Unrecognized row
kind:" + rowKind);
+ }
+ if (record != null) {
+ records.add(record);
+ }
}
- records.add(record);
}
sinkManager.writeRecords(databaseName, tableName, records,
dirtyLogTag, dirtyIdentify, dirtyLabel);
} else {