This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c67f2b708 [INLONG-7455][Sort] Fix dirty data archival format issues in 
Iceberg connector (#7456)
c67f2b708 is described below

commit c67f2b7086154f6e1d100c0e1987fcffdcb72e1a
Author: LinChen <[email protected]>
AuthorDate: Mon Mar 6 09:58:53 2023 +0800

    [INLONG-7455][Sort] Fix dirty data archival format issues in Iceberg 
connector (#7456)
---
 .../iceberg/sink/multiple/DynamicSchemaHandleOperator.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index bc2892163..b95278223 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -304,7 +304,17 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                             } catch (Exception e) {
                                 LOG.warn("Ignore table {} schema change, old: 
{} new: {}.",
                                         tableId, dataSchema, latestSchema, e);
-                                handleDirtyData(jsonNode, jsonNode, 
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
+                                try {
+                                    List<RowData> rowDataForDataSchemaList =
+                                            
dynamicSchemaFormat.extractRowData(jsonNode,
+                                                    
FlinkSchemaUtil.convert(dataSchema));
+                                    for (RowData rowData : 
rowDataForDataSchemaList) {
+                                        handleDirtyData(rowData.toString(), 
jsonNode,
+                                                
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
+                                    }
+                                } catch (Exception ee) {
+                                    LOG.error("handleDirtyData {} failed!", 
jsonNode);
+                                }
                             }
                             return Collections.emptyList();
                         });

Reply via email to