This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 093a10516b8af8620342b32b47daee756db961e0 Author: Yizhou Yang <[email protected]> AuthorDate: Thu Feb 2 10:42:57 2023 +0800 [INLONG-7286][Sort] Fix issue of tableIdentifier being null when addRow (#7287) --- .../inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index 91f3c67b3..0878124f5 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -313,7 +313,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } @Override - public synchronized void writeRecord(T row) { + public synchronized void writeRecord(T row) throws IOException { addBatch(row); boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize()) || batchBytes >= executionOptions.getMaxBatchBytes(); @@ -372,7 +372,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } - private void addBatch(T row) { + private void addBatch(T row) throws IOException { readInNum.incrementAndGet(); if (!multipleSink) { addSingle(row); @@ -441,7 +441,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { @SuppressWarnings({"unchecked"}) private void addRow(RowKind rowKind, JsonNode rootNode, JsonNode physicalNode, JsonNode updateBeforeNode, - Map<String, String> physicalData, Map<String, String> updateBeforeData) { + Map<String, String> physicalData, Map<String, String> updateBeforeData) throws IOException { + String tableIdentifier = StringUtils.join( + jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".", + jsonDynamicSchemaFormat.parse(rootNode, tablePattern)); switch (rowKind) { case INSERT: case UPDATE_AFTER:
