This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed9698b683 [core]Improve partial update merge engine configuration
ignore-delete and remove-record-on-sequence-group cannot be enabled at the same
time. (#5609)
ed9698b683 is described below
commit ed9698b683bb3e237bbaf952c36f556d25db793a
Author: Kerwin <[email protected]>
AuthorDate: Tue May 20 10:19:02 2025 +0800
[core]Improve partial update merge engine configuration ignore-delete and
remove-record-on-sequence-group cannot be enabled at the same time. (#5609)
---
.../merge-engine/partial-update.md | 3 +-
.../compact/PartialUpdateMergeFunction.java | 71 ++++++++++++----------
.../apache/paimon/flink/PartialUpdateITCase.java | 37 ++++++++++-
3 files changed, 74 insertions(+), 37 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md
b/docs/content/primary-key-table/merge-engine/partial-update.md
index 2329d72cba..e9dc313fab 100644
--- a/docs/content/primary-key-table/merge-engine/partial-update.md
+++ b/docs/content/primary-key-table/merge-engine/partial-update.md
@@ -50,8 +50,7 @@ By default, Partial update can not accept delete records, you
can choose one of
- Configure 'ignore-delete' to ignore delete records.
- Configure 'partial-update.remove-record-on-delete' to remove the whole row
when receiving delete records.
-- Configure 'sequence-group's to retract partial columns.
- * Configure 'partial-update.remove-record-on-sequence-group' to remove the
whole row when receiving delete records of specified sequence group.
+- Configure 'sequence-group's to retract partial columns. Also configure
'partial-update.remove-record-on-sequence-group' to remove the whole row when
receiving deleted records of `specified sequence group`.
{{< /hint >}}
## Sequence Group
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index f061192979..d0633781ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -161,7 +161,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
+ " you can choose one of the following
solutions:",
"1. Configure 'ignore-delete' to ignore delete
records.",
"2. Configure
'partial-update.remove-record-on-delete' to remove the whole row when receiving
delete records.",
- "3. Configure 'sequence-group's to retract partial
columns.");
+ "3. Configure 'sequence-group's to retract partial
columns. Also configure 'partial-update.remove-record-on-sequence-group' to
remove the whole row when receiving deleted records of `specified sequence
group`.");
throw new IllegalArgumentException(msg);
}
@@ -391,27 +391,26 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private final boolean removeRecordOnDelete;
- private final String removeRecordOnSequenceGroup;
-
private Set<Integer> sequenceGroupPartialDelete;
private Factory(Options options, RowType rowType, List<String>
primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
this.tableTypes = rowType.getFieldTypes();
- this.removeRecordOnSequenceGroup =
+ this.removeRecordOnDelete =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+ String removeRecordOnSequenceGroup =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
this.sequenceGroupPartialDelete = new HashSet<>();
List<String> fieldNames = rowType.getFieldNames();
this.fieldSeqComparators = new HashMap<>();
Map<String, Integer> sequenceGroupMap = new HashMap<>();
- List<String> allSequenceFields = new ArrayList<>();
+ List<Integer> allSequenceFields = new ArrayList<>();
for (Map.Entry<String, String> entry : options.toMap().entrySet())
{
String k = entry.getKey();
String v = entry.getValue();
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP))
{
- List<String> sequenceFields =
+ int[] sequenceFields =
Arrays.stream(
k.substring(
FIELDS_PREFIX.length() + 1,
@@ -419,17 +418,13 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
-
SEQUENCE_GROUP.length()
- 1)
.split(FIELDS_SEPARATOR))
- .map(fieldName ->
validateFieldName(fieldName, fieldNames))
- .collect(Collectors.toList());
- allSequenceFields.addAll(sequenceFields);
+ .mapToInt(fieldName ->
requireField(fieldName, fieldNames))
+ .toArray();
Supplier<FieldsComparator> userDefinedSeqComparator =
() -> UserDefinedSeqComparator.create(rowType,
sequenceFields, true);
Arrays.stream(v.split(FIELDS_SEPARATOR))
- .map(
- fieldName ->
- fieldNames.indexOf(
-
validateFieldName(fieldName, fieldNames)))
+ .map(fieldName -> requireField(fieldName,
fieldNames))
.forEach(
field -> {
if
(fieldSeqComparators.containsKey(field)) {
@@ -442,42 +437,54 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
});
// add self
- sequenceFields.forEach(
- fieldName -> {
- int index = fieldNames.indexOf(fieldName);
- fieldSeqComparators.put(index,
userDefinedSeqComparator);
- sequenceGroupMap.put(fieldName, index);
- });
+ for (int index : sequenceFields) {
+ allSequenceFields.add(index);
+ String fieldName = fieldNames.get(index);
+ fieldSeqComparators.put(index,
userDefinedSeqComparator);
+ sequenceGroupMap.put(fieldName, index);
+ }
}
}
this.fieldAggregators =
createFieldAggregators(
rowType, primaryKeys, allSequenceFields, new
CoreOptions(options));
- removeRecordOnDelete =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
-
+ // check if partial-update.remove-record-on-delete and
ignore-delete are enabled at the
Preconditions.checkState(
!(removeRecordOnDelete && ignoreDelete),
String.format(
"%s and %s have conflicting behavior so should not
be enabled at the same time.",
- CoreOptions.IGNORE_DELETE,
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+ CoreOptions.IGNORE_DELETE.key(),
+ PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key()));
+
+ // check if partial-update.remove-record-on-sequence-grou and
ignore-delete are enabled
+ // at the same time.
+ Preconditions.checkState(
+ !(removeRecordOnSequenceGroup != null && ignoreDelete),
+ String.format(
+ "%s and %s have conflicting behavior so should not
be enabled at the same time.",
+ CoreOptions.IGNORE_DELETE.key(),
+
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
+
+ // check if partial-update.remove-record-on-delete and
sequence-group are enabled at the
+ // same time.
Preconditions.checkState(
!removeRecordOnDelete || fieldSeqComparators.isEmpty(),
String.format(
- "sequence group and %s have conflicting behavior
so should not be enabled at the same time.",
- PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+ "%s and %s have conflicting behavior so should not
be enabled at the same time.",
+ SEQUENCE_GROUP,
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key()));
if (removeRecordOnSequenceGroup != null) {
- String[] sequenceGroupArr =
removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR);
+ List<String> sequenceGroupFields =
+
Arrays.asList(removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR));
Preconditions.checkState(
-
sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)),
+
sequenceGroupMap.keySet().containsAll(sequenceGroupFields),
String.format(
"field '%s' defined in '%s' option must be
part of sequence groups",
removeRecordOnSequenceGroup,
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
sequenceGroupPartialDelete =
- Arrays.stream(sequenceGroupArr)
- .filter(sequenceGroupMap::containsKey)
+ sequenceGroupFields.stream()
.map(sequenceGroupMap::get)
.collect(Collectors.toSet());
}
@@ -608,14 +615,14 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return new AdjustedProjection(pushDown, outer);
}
- private String validateFieldName(String fieldName, List<String>
fieldNames) {
+ private int requireField(String fieldName, List<String> fieldNames) {
int field = fieldNames.indexOf(fieldName);
if (field == -1) {
throw new IllegalArgumentException(
String.format("Field %s can not be found in table
schema", fieldName));
}
- return fieldName;
+ return field;
}
/**
@@ -626,7 +633,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
RowType rowType,
List<String> primaryKeys,
- List<String> allSequenceFields,
+ List<Integer> allSequenceFields,
CoreOptions options) {
List<String> fieldNames = rowType.getFieldNames();
@@ -636,7 +643,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
String fieldName = fieldNames.get(i);
DataType fieldType = fieldTypes.get(i);
- if (allSequenceFields.contains(fieldName)) {
+ if (allSequenceFields.contains(i)) {
// no agg for sequence fields
continue;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index d5f1f7bbea..2da837d2f4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -623,7 +623,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
}
@Test
- public void testRemoveRecordOnDeleteWithoutSequenceGroup() {
+ public void testRemoveRecordOnDeleteWithoutSequenceGroup() throws
Exception {
sql(
"CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT
ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
@@ -645,6 +645,19 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
// batch read
assertThat(sql("SELECT * FROM remove_record_on_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apache"));
+
+ // delete record with changelog stream
+ String id =
+ TestValuesTableFactory.registerData(
+ Collections.singletonList(Row.ofKind(RowKind.DELETE,
1, "A", null)));
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE delete_source1 (pk INT, a
STRING, b STRING) "
+ + "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ + "'changelog-mode' = 'I,D,UA,UB')",
+ id));
+ sEnv.executeSql("INSERT INTO remove_record_on_delete SELECT * FROM
delete_source1").await();
+ assertThat(sql("SELECT * FROM remove_record_on_delete")).isEmpty();
}
@Test
@@ -680,8 +693,26 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
.containsExactlyInAnyOrder(Row.of(1, "apple", 2, null, 2));
- // delete record
- sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 1");
+ // delete record with seq_a
+ String id2 =
+ TestValuesTableFactory.registerData(
+ Collections.singletonList(
+ Row.ofKind(RowKind.DELETE, 1, "apple", 2,
null, null)));
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE delete_source2 (pk INT, a
STRING, seq_a INT, b STRING, seq_b INT) "
+ + "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ + "'changelog-mode' = 'I,D,UA,UB')",
+ id2));
+ sEnv.executeSql(
+ "INSERT INTO remove_record_on_delete_sequence_group
SELECT * FROM delete_source2")
+ .await();
+ assertThat(sql("SELECT * FROM
remove_record_on_delete_sequence_group")).isEmpty();
+
+ // batch delete record
+ sql(
+ "INSERT INTO remove_record_on_delete_sequence_group VALUES (2,
'flink', 2, 'paimon', 1)");
+ sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 2");
assertThat(sql("SELECT * FROM
remove_record_on_delete_sequence_group")).isEmpty();
}