This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 37e8fb02e [common] supports configuring force-lookup. (#3905)
37e8fb02e is described below
commit 37e8fb02e7ef185b74765ca07486e977e2893583
Author: liming.1018 <[email protected]>
AuthorDate: Wed Aug 7 19:02:30 2024 +0800
[common] supports configuring force-lookup. (#3905)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++-
.../org/apache/paimon/lookup/LookupStrategy.java | 15 +++--
.../LookupChangelogMergeFunctionWrapperTest.java | 6 +-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 64 ++++++++++++++++++++++
.../paimon/flink/sink/AsyncLookupSinkWrite.java | 2 +-
6 files changed, 94 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 784719d76..b0497b345 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -345,6 +345,12 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the
number of file handles. If it is too small, may cause intermediate merging. But
if it is too large, it will cause too many files opened at the same time,
consume memory and lead to random reading.</td>
</tr>
+ <tr>
+ <td><h5>force-lookup</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force the use of lookup for compaction.</td>
+ </tr>
<tr>
<td><h5>lookup-wait</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index bd10ff000..aa4bcb685 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1258,6 +1258,13 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("Specifies the commit user prefix.");
+ @Immutable
+ public static final ConfigOption<Boolean> FORCE_LOOKUP =
+ key("force-lookup")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to force the use of lookup for
compaction.");
+
public static final ConfigOption<Boolean> LOOKUP_WAIT =
key("lookup-wait")
.booleanType()
@@ -1705,7 +1712,8 @@ public class CoreOptions implements Serializable {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
changelogProducer().equals(ChangelogProducer.LOOKUP),
- deletionVectorsEnabled());
+ deletionVectorsEnabled(),
+ options.get(FORCE_LOOKUP));
}
public boolean changelogRowDeduplicate() {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
index 24e03ebdb..f01c7c967 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
@@ -29,15 +29,22 @@ public class LookupStrategy {
public final boolean deletionVector;
- private LookupStrategy(boolean isFirstRow, boolean produceChangelog,
boolean deletionVector) {
+ private LookupStrategy(
+ boolean isFirstRow,
+ boolean produceChangelog,
+ boolean deletionVector,
+ boolean forceLookup) {
this.isFirstRow = isFirstRow;
this.produceChangelog = produceChangelog;
this.deletionVector = deletionVector;
- this.needLookup = produceChangelog || deletionVector || isFirstRow;
+ this.needLookup = produceChangelog || deletionVector || isFirstRow ||
forceLookup;
}
public static LookupStrategy from(
- boolean isFirstRow, boolean produceChangelog, boolean
deletionVector) {
- return new LookupStrategy(isFirstRow, produceChangelog,
deletionVector);
+ boolean isFirstRow,
+ boolean produceChangelog,
+ boolean deletionVector,
+ boolean forceLookup) {
+ return new LookupStrategy(isFirstRow, produceChangelog,
deletionVector, forceLookup);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index dbee0805a..e2d37eae4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -73,7 +73,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
highLevel::get,
EQUALISER,
changelogRowDeduplicate,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
null);
@@ -233,7 +233,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
key -> null,
EQUALISER,
changelogRowDeduplicate,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
null);
@@ -322,7 +322,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
highLevel::get,
EQUALISER,
false,
- LookupStrategy.from(false, true, false),
+ LookupStrategy.from(false, true, false, false),
null,
UserDefinedSeqComparator.create(
RowType.builder().field("f0",
DataTypes.INT()).build(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 09bd6eea1..e8ece0779 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -29,6 +29,8 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -65,9 +67,11 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CompatibilityTestUtils;
+import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -95,7 +99,11 @@ import static
org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine;
+import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
@@ -1670,6 +1678,62 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
// table-path/changelog/EARLIEST
}
+ @ParameterizedTest
+ @EnumSource(CoreOptions.MergeEngine.class)
+ public void testForceLookupCompaction(CoreOptions.MergeEngine mergeEngine)
throws Exception {
+ Map<MergeEngine, Pair<Long, Long>> testData = new HashMap<>();
+ testData.put(DEDUPLICATE, Pair.of(50L, 100L));
+ testData.put(PARTIAL_UPDATE, Pair.of(null, 100L));
+ testData.put(AGGREGATE, Pair.of(30L, 70L));
+ testData.put(FIRST_ROW, Pair.of(100L, 70L));
+
+ Pair<Long, Long> currentTestData = testData.get(mergeEngine);
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(CoreOptions.FORCE_LOOKUP, true);
+ options.set(MERGE_ENGINE, mergeEngine);
+ if (mergeEngine == AGGREGATE) {
+ options.set("fields.b.aggregate-function",
"sum");
+ }
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ write.withIOManager(IOManager.create(tempDir.toString()));
+
+ // write data
+ write.write(rowData(1, 10, currentTestData.getLeft()));
+ commit.commit(1, write.prepareCommit(true, 1));
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2L);
+
+ write.write(rowData(1, 10, currentTestData.getRight()));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ commit.close();
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(4L);
+ assertThat(table.snapshotManager().latestSnapshot())
+ .matches(snapshot -> snapshot.commitKind() == COMPACT);
+
+ // 3 data files + bucket-0 directory
+ List<java.nio.file.Path> files =
+ Files.walk(new File(tablePath.toUri().getPath(),
"pt=1/bucket-0").toPath())
+ .collect(Collectors.toList());
+ assertThat(files.size()).isEqualTo(4);
+
+ // 2 data files compact into 1 file
+ FileStoreScan scan = table.store().newScan().withKind(ScanMode.DELTA);
+ assertThat(scan.plan().files(FileKind.ADD).size()).isEqualTo(1);
+ assertThat(scan.plan().files(FileKind.DELETE).size()).isEqualTo(2);
+
+ // check result
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+ assertThat(getResult(read, splits, binaryRow(1), 0,
BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Collections.singletonList(
+
"1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
private void assertReadChangelog(int id, FileStoreTable table) throws
Exception {
// read the changelog at #{id}
table =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
index 9c814463c..fcb4c89eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -34,7 +34,7 @@ import java.util.Map;
/**
* {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
- * org.apache.paimon.CoreOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} set to false.
+ * org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
*/
public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {