This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c67f2b708 [INLONG-7455][Sort] Fix dirty data archival format issues in
Iceberg connector (#7456)
c67f2b708 is described below
commit c67f2b7086154f6e1d100c0e1987fcffdcb72e1a
Author: LinChen <[email protected]>
AuthorDate: Mon Mar 6 09:58:53 2023 +0800
[INLONG-7455][Sort] Fix dirty data archival format issues in Iceberg
connector (#7456)
---
.../iceberg/sink/multiple/DynamicSchemaHandleOperator.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index bc2892163..b95278223 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -304,7 +304,17 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
} catch (Exception e) {
LOG.warn("Ignore table {} schema change, old:
{} new: {}.",
tableId, dataSchema, latestSchema, e);
- handleDirtyData(jsonNode, jsonNode,
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
+ try {
+ List<RowData> rowDataForDataSchemaList =
+
dynamicSchemaFormat.extractRowData(jsonNode,
+
FlinkSchemaUtil.convert(dataSchema));
+ for (RowData rowData :
rowDataForDataSchemaList) {
+ handleDirtyData(rowData.toString(),
jsonNode,
+
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
+ }
+ } catch (Exception ee) {
+ LOG.error("handleDirtyData {} failed!",
jsonNode);
+ }
}
return Collections.emptyList();
});