This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ff99bec75 [core] Support drop delete in rewriteChangelogCompaction
(#2875)
ff99bec75 is described below
commit ff99bec75d3687fdbe0b45efe8fe09eba10516e0
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 22 13:27:16 2024 +0800
[core] Support drop delete in rewriteChangelogCompaction (#2875)
---
.../apache/paimon/mergetree/DropDeleteReader.java | 8 ++---
.../compact/ChangelogMergeTreeRewriter.java | 14 +++++---
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 39 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
index 7bb50d177..980a6a1c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
@@ -27,8 +27,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
/**
- * A {@link RecordReader} which drops {@link KeyValue} of {@link
RowKind#DELETE} kind from the
- * wrapped reader.
+ * A {@link RecordReader} which drops {@link KeyValue} that does not meet
{@link RowKind#isAdd} from
+ * the wrapped reader.
*/
public class DropDeleteReader implements RecordReader<KeyValue> {
@@ -55,9 +55,7 @@ public class DropDeleteReader implements
RecordReader<KeyValue> {
if (kv == null) {
return null;
}
-
- if (kv.valueKind() == RowKind.INSERT
- || kv.valueKind() == RowKind.UPDATE_AFTER) {
+ if (kv.isAdd()) {
return kv;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 7be5fc741..c57087eea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -93,7 +93,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
public CompactResult rewrite(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) throws Exception {
if (rewriteChangelog(outputLevel, dropDelete, sections)) {
- return rewriteChangelogCompaction(outputLevel, sections, true);
+ return rewriteChangelogCompaction(outputLevel, sections,
dropDelete, true);
} else {
return rewriteCompaction(outputLevel, dropDelete, sections);
}
@@ -102,10 +102,14 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
/**
* Rewrite and produce changelog at the same time.
*
+ * @param dropDelete whether to drop delete when rewrite compact file
* @param rewriteCompactFile whether to rewrite compact file
*/
private CompactResult rewriteChangelogCompaction(
- int outputLevel, List<List<SortedRun>> sections, boolean
rewriteCompactFile)
+ int outputLevel,
+ List<List<SortedRun>> sections,
+ boolean dropDelete,
+ boolean rewriteCompactFile)
throws Exception {
List<ConcatRecordReader.ReaderSupplier<ChangelogResult>>
sectionReaders = new ArrayList<>();
for (List<SortedRun> section : sections) {
@@ -132,8 +136,9 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
- if (rewriteCompactFile && result.result() != null) {
- compactFileWriter.write(result.result());
+ KeyValue keyValue = result.result();
+ if (rewriteCompactFile && keyValue != null && (!dropDelete ||
keyValue.isAdd())) {
+ compactFileWriter.write(keyValue);
}
for (KeyValue kv : result.changelogs()) {
changelogFileWriter.write(kv);
@@ -170,6 +175,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
+ false,
strategy.rewrite);
} else {
return super.upgrade(outputLevel, file);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 2412e4b7c..2bd3fe1a4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -80,6 +81,7 @@ import java.util.function.Function;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
+import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -1233,6 +1235,43 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
innerTestTableQuery(table);
}
+ @Test
+ public void testLookupWithDropDelete() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CHANGELOG_PRODUCER, LOOKUP);
+ conf.set("num-levels", "2");
+ });
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ StreamTableWrite write =
table.newWrite(commitUser).withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ write.write(rowData(1, 1, 100L));
+ write.write(rowData(1, 2, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ // set num-levels = 2 to make sure that this delete can trigger
compaction with drop delete
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 1, 100L));
+ commit.commit(1, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+
+ Snapshot latestSnapshot =
table.newSnapshotReader().snapshotManager().latestSnapshot();
+ assertThat(latestSnapshot.commitKind()).isEqualTo(COMPACT);
+ assertThat(latestSnapshot.totalRecordCount()).isEqualTo(1);
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ binaryRow(1),
+ 0,
+ BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
private void innerTestTableQuery(FileStoreTable table) throws Exception {
IOManager ioManager = IOManager.create(tablePath.toString());
StreamTableWrite write =
table.newWrite(commitUser).withIOManager(ioManager);