This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95260e20a [INLONG-7286][Sort] Fix issue of tableidentifier being null
when addRow (#7287)
95260e20a is described below
commit 95260e20ad643496c0ab9291122419475ae5889c
Author: Yizhou Yang <[email protected]>
AuthorDate: Thu Feb 2 10:42:57 2023 +0800
[INLONG-7286][Sort] Fix issue of tableidentifier being null when addRow
(#7287)
---
.../inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 91f3c67b3..0878124f5 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -313,7 +313,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
@Override
- public synchronized void writeRecord(T row) {
+ public synchronized void writeRecord(T row) throws IOException {
addBatch(row);
boolean valid = (executionOptions.getBatchSize() > 0 && size >=
executionOptions.getBatchSize())
|| batchBytes >= executionOptions.getMaxBatchBytes();
@@ -372,7 +372,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
- private void addBatch(T row) {
+ private void addBatch(T row) throws IOException {
readInNum.incrementAndGet();
if (!multipleSink) {
addSingle(row);
@@ -441,7 +441,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
@SuppressWarnings({"unchecked"})
private void addRow(RowKind rowKind, JsonNode rootNode, JsonNode
physicalNode, JsonNode updateBeforeNode,
- Map<String, String> physicalData, Map<String, String>
updateBeforeData) {
+ Map<String, String> physicalData, Map<String, String>
updateBeforeData) throws IOException {
+ String tableIdentifier = StringUtils.join(
+ jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
+ jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
switch (rowKind) {
case INSERT:
case UPDATE_AFTER: