This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f457d35e4 [core] Support delete row in partial updates (#3602)
f457d35e4 is described below
commit f457d35e460a2d84c8e58b2fa87b31b339b5b302
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Jul 17 16:13:28 2024 +0800
[core] Support delete row in partial updates (#3602)
---
.../merge-engine/partial-update.md | 1 +
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 8 ++
.../compact/PartialUpdateMergeFunction.java | 39 +++++++-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 101 +++++++++++++++++++++
5 files changed, 152 insertions(+), 3 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md
b/docs/content/primary-key-table/merge-engine/partial-update.md
index 70b12618e..09c376c25 100644
--- a/docs/content/primary-key-table/merge-engine/partial-update.md
+++ b/docs/content/primary-key-table/merge-engine/partial-update.md
@@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one
of the following solutions:
- Configure 'ignore-delete' to ignore delete records.
+- Configure 'partial-update.remove-record-on-delete' to remove the whole row
when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 89db43b43..19a73cc5d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -485,6 +485,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in
parquet.</td>
</tr>
+ <tr>
+ <td><h5>partial-update.remove-record-on-delete</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to remove the whole row in partial-update engine when
-D records are received.</td>
+ </tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8864bc3d3..9bb6ab91f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for
primary key table,"
+ " the sequence number determines which
data is the most recent.");
+ @Immutable
+ public static final ConfigOption<Boolean>
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
+ key("partial-update.remove-record-on-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to remove the whole row in partial-update
engine when -D records are received.");
+
@Immutable
public static final ConfigOption<String> ROWKIND_FIELD =
key("rowkind.field")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 154595859..3013d6ad5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
+import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
/**
@@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private final Map<Integer, FieldsComparator> fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
+ private final boolean removeRecordOnDelete;
private InternalRow currentKey;
private long latestSequenceNumber;
@@ -74,12 +77,14 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
boolean ignoreDelete,
Map<Integer, FieldsComparator> fieldSeqComparators,
Map<Integer, FieldAggregator> fieldAggregators,
- boolean fieldSequenceEnabled) {
+ boolean fieldSequenceEnabled,
+ boolean removeRecordOnDelete) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
+ this.removeRecordOnDelete = removeRecordOnDelete;
}
@Override
@@ -106,6 +111,14 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return;
}
+ if (removeRecordOnDelete) {
+ if (kv.valueKind() == RowKind.DELETE) {
+ row = null;
+ }
+ // ignore -U records
+ return;
+ }
+
String msg =
String.join(
"\n",
@@ -235,6 +248,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
if (reused == null) {
reused = new KeyValue();
}
+ if (removeRecordOnDelete && row == null) {
+ return null;
+ }
return reused.replace(currentKey, latestSequenceNumber,
RowKind.INSERT, row);
}
@@ -256,6 +272,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private final Map<Integer, FieldAggregator> fieldAggregators;
+ private final boolean removeRecordOnDelete;
+
private Factory(Options options, RowType rowType, List<String>
primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
@@ -310,6 +328,19 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}
+
+ removeRecordOnDelete =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+
+ Preconditions.checkState(
+ !(removeRecordOnDelete && ignoreDelete),
+ String.format(
+ "%s and %s have conflicting behavior so should not
be enabled at the same time.",
+ CoreOptions.IGNORE_DELETE,
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+ Preconditions.checkState(
+ !removeRecordOnDelete || fieldSeqComparators.isEmpty(),
+ String.format(
+ "sequence group and %s have conflicting behavior
so should not be enabled at the same time.",
+ PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
}
@Override
@@ -368,14 +399,16 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
- !fieldSeqComparators.isEmpty());
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete);
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
fieldAggregators,
- !fieldSeqComparators.isEmpty());
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index c4867bcac..b74f242e1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -1023,6 +1023,107 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]",
"+I[10, 1]");
}
+ @Test
+ public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "partial-update");
+
options.set("partial-update.remove-record-on-delete", "true");
+ },
+ rowType);
+ Function<InternalRow, String> rowToString = row ->
internalRowToString(row, rowType);
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite("");
+ StreamTableCommit commit = table.newCommit("");
+ // 1. inserts
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.of(1, 1, 1, 1));
+ write.write(GenericRow.of(1, 1, 2, 2));
+ commit.commit(0, write.prepareCommit(true, 0));
+ List<String> result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+ // 2. Update Before
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+ commit.commit(1, write.prepareCommit(true, 1));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+ // 3. Update After
+ write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+ commit.commit(2, write.prepareCommit(true, 2));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
+
+ // 4. Retracts
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+ commit.commit(3, write.prepareCommit(true, 3));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).isEmpty();
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testPartialUpdateWithAgg() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "partial-update");
+ options.set("fields.a.sequence-group", "c");
+ options.set("fields.c.aggregate-function", "sum");
+ },
+ rowType);
+ Function<InternalRow, String> rowToString = row ->
internalRowToString(row, rowType);
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite("");
+ StreamTableCommit commit = table.newCommit("");
+ // 1. inserts
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.of(1, 1, 1, 1));
+ write.write(GenericRow.of(1, 1, 2, 2));
+ commit.commit(0, write.prepareCommit(true, 0));
+ List<String> result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");
+
+ // 2. Update Before
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+ commit.commit(1, write.prepareCommit(true, 1));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+
+ // 3. Update After
+ write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+ commit.commit(2, write.prepareCommit(true, 2));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");
+
+ // 4. Retracts
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+ commit.commit(3, write.prepareCommit(true, 3));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+ write.close();
+ commit.close();
+ }
+
@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =