This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f3813f0751 [cdc] Correct equalsIgnoreFieldId in
UpdatedDataFieldsProcessFunctionBase
f3813f0751 is described below
commit f3813f075189ecdf2f1fd77e37d7bd32fcd204e5
Author: Jingsong <[email protected]>
AuthorDate: Wed Dec 18 10:59:20 2024 +0800
[cdc] Correct equalsIgnoreFieldId in UpdatedDataFieldsProcessFunctionBase
---
.../java/org/apache/paimon/types/ArrayType.java | 2 +-
.../java/org/apache/paimon/types/DataType.java | 27 ++++++++++++----------
.../main/java/org/apache/paimon/types/MapType.java | 2 +-
.../main/java/org/apache/paimon/types/RowType.java | 3 ++-
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 2 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 8 ++++---
6 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
index 62fb9ce65b..f4c523534e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/ArrayType.java
@@ -101,7 +101,7 @@ public final class ArrayType extends DataType {
}
@Override
- public boolean equalsIgnoreFieldId(Object o) {
+ public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataType.java
b/paimon-common/src/main/java/org/apache/paimon/types/DataType.java
index dd9a4685ef..ac1e5b2a64 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataType.java
@@ -23,8 +23,6 @@ import org.apache.paimon.utils.Preconditions;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
@@ -124,15 +122,6 @@ public abstract class DataType implements Serializable {
return copy(isNullable);
}
- /**
- * Compare two data types without nullable.
- *
- * @param o the target data type
- */
- public boolean equalsIgnoreNullable(@Nonnull DataType o) {
- return Objects.equals(this.copy(true), o.copy(true));
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -145,7 +134,21 @@ public abstract class DataType implements Serializable {
return isNullable == that.isNullable && typeRoot == that.typeRoot;
}
- public boolean equalsIgnoreFieldId(Object o) {
+ /**
+ * Compare two data types without nullable.
+ *
+ * @param o the target data type
+ */
+ public boolean equalsIgnoreNullable(DataType o) {
+ return Objects.equals(this.copy(true), o.copy(true));
+ }
+
+ /**
+ * Compare two data types without field id.
+ *
+ * @param o the target data type
+ */
+ public boolean equalsIgnoreFieldId(DataType o) {
return equals(o);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java
b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java
index b715d49284..75ea5fbb7c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/MapType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/MapType.java
@@ -110,7 +110,7 @@ public class MapType extends DataType {
}
@Override
- public boolean equalsIgnoreFieldId(Object o) {
+ public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 625a4634b3..fecb5bed9e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -213,7 +213,8 @@ public final class RowType extends DataType {
return fields.equals(rowType.fields);
}
- public boolean equalsIgnoreFieldId(Object o) {
+ @Override
+ public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 504f631058..e143aabf6c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -77,7 +77,7 @@ public class UpdatedDataFieldsProcessFunction
extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
- /**
+ /*
* Here, actualUpdatedDataFields cannot be used to update latestFields
because there is a
* non-SchemaChange.AddColumn scenario. Otherwise, the previously
existing fields cannot be
* modified again.
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index d50df23742..90edbc034a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -219,9 +219,11 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
String newFieldName =
StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
- // we compare by ignoring nullable, because partition keys and
primary keys might be
- // nullable in source database, but they can't be null in
Paimon
- if (oldField.type().equalsIgnoreNullable(newField.type())) {
+ // 1. we compare by ignoring nullable, because partition keys
and primary keys might
+ // be nullable in source database, but they can't be null in
Paimon
+ // 2. we compare by ignoring field id, the field ID is newly
created and may be
+ // different, we should ignore it
+ if
(oldField.type().copy(true).equalsIgnoreFieldId(newField.type().copy(true))) {
// update column comment
if (newField.description() != null
&&
!newField.description().equals(oldField.description())) {