This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new a1421af7d [INLONG-6149][Sort] Iceberg delete key cause
ArrayIndexOutOfBoundsException (#6150)
a1421af7d is described below
commit a1421af7dd42a668267c5b0c5e3de206510493fb
Author: thesumery <[email protected]>
AuthorDate: Wed Oct 12 14:21:03 2022 +0800
[INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException
(#6150)
---
.../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java | 10 +++++++++-
.../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 36240760f..84aaf2af1 100644
---
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty() ||
appendMode) {
this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row
kind. Therefore, any column of the inserted
+ // row may differ from the deleted row other than the primary key
fields, and the delete file must contain
+ // values that are correct for the deleted row. Therefore, only
write the equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds),
+ TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row
schema.
this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 831ef6a4a..aa724ac7f 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ArrayUtil;
import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
if (equalityFieldIds == null || equalityFieldIds.isEmpty() ||
appendMode) {
this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec);
+ } else if (upsert) {
+ // In upsert mode, only the new row is emitted using INSERT row
kind. Therefore, any column of the inserted
+ // row may differ from the deleted row other than the primary key
fields, and the delete file must contain
+ // values that are correct for the deleted row. Therefore, only
write the equality delete fields.
+ this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds),
+ TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds)), null);
} else {
- // TODO provide the ability to customize the equality-delete row
schema.
this.appenderFactory = new FlinkAppenderFactory(schema,
flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
}