This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e0460acea [INLONG-7539][Sort] StarRocks connector parses row kind from 
GenericRowData (#7541)
e0460acea is described below

commit e0460acea10d10f376b4f3e91319b0bb9800abea
Author: Liao Rui <[email protected]>
AuthorDate: Wed Mar 8 15:03:12 2023 +0800

    [INLONG-7539][Sort] StarRocks connector parses row kind from GenericRowData 
(#7541)
    
    Co-authored-by: ryanrliao <[email protected]>
---
 .../starrocks/manager/StarRocksSinkManager.java    | 24 +++++++-----
 .../table/sink/StarRocksDynamicSinkFunction.java   | 45 +++++++++++++++-------
 2 files changed, 45 insertions(+), 24 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index c69d28f8a..f55e4e22e 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -522,6 +522,20 @@ public class StarRocksSinkManager implements Serializable {
     }
 
     private void handleDirtyData(SinkBufferEntity flushData, Exception e) 
throws JsonProcessingException {
+        // upload metrics for dirty data
+        if (null != metricData) {
+            if (multipleSink) {
+                metricData.outputDirtyMetrics(flushData.getDatabase(), 
flushData.getTable(),
+                        flushData.getBatchCount(), flushData.getBatchSize());
+            } else {
+                metricData.invokeDirty(flushData.getBatchCount(), 
flushData.getBatchSize());
+            }
+        }
+
+        if (!dirtySinkHelper.getDirtyOptions().ignoreDirty()) {
+            return;
+        }
+
         // archive dirty data
         if 
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
 {
             String columnSeparator = StarRocksDelimiterParser.parse(
@@ -554,16 +568,6 @@ public class StarRocksSinkManager implements Serializable {
                         e);
             }
         }
-
-        // upload metrics for dirty data
-        if (null != metricData) {
-            if (multipleSink) {
-                metricData.outputDirtyMetrics(flushData.getDatabase(), 
flushData.getTable(),
-                        flushData.getBatchCount(), flushData.getBatchSize());
-            } else {
-                metricData.invokeDirty(flushData.getBatchCount(), 
flushData.getBatchSize());
-            }
-        }
     }
 
     private void waitAsyncFlushingDone() throws InterruptedException {
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 878912d6f..3b70c35bb 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -279,25 +279,42 @@ public class StarRocksDynamicSinkFunction<T> extends 
RichSinkFunction<T> impleme
                 LOG.warn("Parse dirty options failed. {}", 
ExceptionUtils.stringifyException(e));
             }
 
-            RowKind rowKind = rowData.getRowKind();
+            List<RowKind> rowKinds = jsonDynamicSchemaFormat.opType2RowKind(
+                    jsonDynamicSchemaFormat.getOpType(rootNode));
             List<Map<String, String>> physicalDataList = 
jsonDynamicSchemaFormat.jsonNode2Map(
                     jsonDynamicSchemaFormat.getPhysicalData(rootNode));
+            JsonNode updateBeforeNode = 
jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+            List<Map<String, String>> updateBeforeList = null;
+            if (updateBeforeNode != null) {
+                updateBeforeList = 
jsonDynamicSchemaFormat.jsonNode2Map(updateBeforeNode);
+            }
             List<Map<String, String>> records = new ArrayList<>();
             for (int i = 0; i < physicalDataList.size(); i++) {
-                Map<String, String> record = physicalDataList.get(i);
-                switch (rowKind) {
-                    case INSERT:
-                    case UPDATE_AFTER:
-                        record.put("__op", 
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
-                        break;
-                    case DELETE:
-                    case UPDATE_BEFORE:
-                        record.put("__op", 
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
-                        break;
-                    default:
-                        throw new RuntimeException("Unrecognized row kind:" + 
rowKind);
+                for (RowKind rowKind : rowKinds) {
+                    Map<String, String> record = null;
+                    switch (rowKind) {
+                        case INSERT:
+                        case UPDATE_AFTER:
+                            record = physicalDataList.get(i);
+                            record.put("__op", 
String.valueOf(StarRocksSinkOP.UPSERT.ordinal()));
+                            break;
+                        case DELETE:
+                            record = physicalDataList.get(i);
+                            record.put("__op", 
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+                            break;
+                        case UPDATE_BEFORE:
+                            if (updateBeforeList != null && 
updateBeforeList.size() > i) {
+                                record = updateBeforeList.get(i);
+                                record.put("__op", 
String.valueOf(StarRocksSinkOP.DELETE.ordinal()));
+                            }
+                            break;
+                        default:
+                            throw new RuntimeException("Unrecognized row 
kind:" + rowKind);
+                    }
+                    if (record != null) {
+                        records.add(record);
+                    }
                 }
-                records.add(record);
             }
             sinkManager.writeRecords(databaseName, tableName, records, 
dirtyLogTag, dirtyIdentify, dirtyLabel);
         } else {

Reply via email to