This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 093a10516b8af8620342b32b47daee756db961e0
Author: Yizhou Yang <[email protected]>
AuthorDate: Thu Feb 2 10:42:57 2023 +0800

    [INLONG-7286][Sort] Fix issue of tableIdentifier being null when addRow 
(#7287)
---
 .../inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 91f3c67b3..0878124f5 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -313,7 +313,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     }
 
     @Override
-    public synchronized void writeRecord(T row) {
+    public synchronized void writeRecord(T row) throws IOException {
         addBatch(row);
         boolean valid = (executionOptions.getBatchSize() > 0 && size >= 
executionOptions.getBatchSize())
                 || batchBytes >= executionOptions.getMaxBatchBytes();
@@ -372,7 +372,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
 
     }
 
-    private void addBatch(T row) {
+    private void addBatch(T row) throws IOException {
         readInNum.incrementAndGet();
         if (!multipleSink) {
             addSingle(row);
@@ -441,7 +441,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
 
     @SuppressWarnings({"unchecked"})
     private void addRow(RowKind rowKind, JsonNode rootNode, JsonNode 
physicalNode, JsonNode updateBeforeNode,
-            Map<String, String> physicalData, Map<String, String> 
updateBeforeData) {
+            Map<String, String> physicalData, Map<String, String> 
updateBeforeData) throws IOException {
+        String tableIdentifier = StringUtils.join(
+                jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
+                jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
         switch (rowKind) {
             case INSERT:
             case UPDATE_AFTER:

Reply via email to