This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 531f6ebb9 [core] Unify contains classes into lookup for first row
(#2915)
531f6ebb9 is described below
commit 531f6ebb928bea0810319544b2ae65a4b41c7091
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Feb 28 16:12:59 2024 +0800
[core] Unify contains classes into lookup for first row (#2915)
---
.../apache/paimon/mergetree/ContainsLevels.java | 210 ---------------------
.../org/apache/paimon/mergetree/LookupLevels.java | 87 ++++++---
.../compact/ChangelogMergeTreeRewriter.java | 9 +-
.../compact/FirstRowMergeTreeCompactRewriter.java | 165 ----------------
.../compact/FistRowMergeFunctionWrapper.java | 75 ++++++++
.../FullChangelogMergeTreeCompactRewriter.java | 11 +-
.../compact/LookupMergeTreeCompactRewriter.java | 89 +++++++--
.../paimon/operation/KeyValueFileStoreWrite.java | 75 +++-----
.../apache/paimon/table/query/LocalTableQuery.java | 19 +-
.../paimon/mergetree/ContainsLevelsTest.java | 38 ++--
.../apache/paimon/mergetree/LookupLevelsTest.java | 21 ++-
.../LookupChangelogMergeFunctionWrapperTest.java | 4 +-
12 files changed, 291 insertions(+), 512 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
deleted file mode 100644
index cce18363c..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.mergetree;
-
-import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.lookup.LookupStoreFactory;
-import org.apache.paimon.lookup.LookupStoreReader;
-import org.apache.paimon.lookup.LookupStoreWriter;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BloomFilter;
-import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.IOFunction;
-
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.time.Duration;
-import java.util.Comparator;
-import java.util.TreeSet;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** Provide contains key. */
-public class ContainsLevels implements Levels.DropFileCallback, Closeable {
-
- private static final byte[] EMPTY_VALUE = new byte[0];
-
- private final Levels levels;
- private final Comparator<InternalRow> keyComparator;
- private final RowCompactedSerializer keySerializer;
- private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
- private final Supplier<File> localFileFactory;
- private final LookupStoreFactory lookupStoreFactory;
- private final Cache<String, ContainsFile> containsFiles;
- private final Function<Long, BloomFilter.Builder> bfGenerator;
-
- public ContainsLevels(
- Levels levels,
- Comparator<InternalRow> keyComparator,
- RowType keyType,
- IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
- Supplier<File> localFileFactory,
- LookupStoreFactory lookupStoreFactory,
- Duration fileRetention,
- MemorySize maxDiskSize,
- Function<Long, BloomFilter.Builder> bfGenerator) {
- this.levels = levels;
- this.keyComparator = keyComparator;
- this.keySerializer = new RowCompactedSerializer(keyType);
- this.fileReaderFactory = fileReaderFactory;
- this.localFileFactory = localFileFactory;
- this.lookupStoreFactory = lookupStoreFactory;
- this.containsFiles =
- Caffeine.newBuilder()
- .expireAfterAccess(fileRetention)
- .maximumWeight(maxDiskSize.getKibiBytes())
- .weigher(this::fileWeigh)
- .removalListener(this::removalCallback)
- .executor(MoreExecutors.directExecutor())
- .build();
- levels.addDropFileCallback(this);
- this.bfGenerator = bfGenerator;
- }
-
- @VisibleForTesting
- Cache<String, ContainsFile> containsFiles() {
- return containsFiles;
- }
-
- @Override
- public void notifyDropFile(String file) {
- containsFiles.invalidate(file);
- }
-
- public boolean contains(InternalRow key, int startLevel) throws
IOException {
- Boolean result =
- LookupUtils.lookup(levels, key, startLevel, this::contains,
this::containsLevel0);
- return result != null && result;
- }
-
- @Nullable
- private Boolean containsLevel0(InternalRow key, TreeSet<DataFileMeta>
level0)
- throws IOException {
- return LookupUtils.lookupLevel0(keyComparator, key, level0,
this::contains);
- }
-
- @Nullable
- private Boolean contains(InternalRow key, SortedRun level) throws
IOException {
- return LookupUtils.lookup(keyComparator, key, level, this::contains);
- }
-
- @Nullable
- private Boolean contains(InternalRow key, DataFileMeta file) throws
IOException {
- ContainsFile containsFile =
containsFiles.getIfPresent(file.fileName());
- while (containsFile == null || containsFile.isClosed) {
- containsFile = createContainsFile(file);
- containsFiles.put(file.fileName(), containsFile);
- }
- if (containsFile.get(keySerializer.serializeToBytes(key)) != null) {
- return true;
- }
- return null;
- }
-
- private int fileWeigh(String file, ContainsFile containsFile) {
- return fileKibiBytes(containsFile.localFile);
- }
-
- private void removalCallback(String key, ContainsFile file, RemovalCause
cause) {
- if (file != null) {
- try {
- file.close();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
- private ContainsFile createContainsFile(DataFileMeta file) throws
IOException {
- File localFile = localFileFactory.get();
- if (!localFile.createNewFile()) {
- throw new IOException("Can not create new file: " + localFile);
- }
- LookupStoreWriter kvWriter =
- lookupStoreFactory.createWriter(localFile,
bfGenerator.apply(file.rowCount()));
- LookupStoreFactory.Context context;
- try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
- RecordReader.RecordIterator<KeyValue> batch;
- KeyValue kv;
- while ((batch = reader.readBatch()) != null) {
- while ((kv = batch.next()) != null) {
- byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
- kvWriter.put(keyBytes, EMPTY_VALUE);
- }
- batch.releaseBatch();
- }
- } catch (IOException e) {
- FileIOUtils.deleteFileOrDirectory(localFile);
- throw e;
- } finally {
- context = kvWriter.close();
- }
-
- return new ContainsFile(localFile,
lookupStoreFactory.createReader(localFile, context));
- }
-
- @Override
- public void close() throws IOException {
- containsFiles.invalidateAll();
- }
-
- private static class ContainsFile implements Closeable {
-
- private final File localFile;
- private final LookupStoreReader reader;
-
- private boolean isClosed = false;
-
- public ContainsFile(File localFile, LookupStoreReader reader) {
- this.localFile = localFile;
- this.reader = reader;
- }
-
- @Nullable
- public byte[] get(byte[] key) throws IOException {
- checkArgument(!isClosed);
- return reader.lookup(key);
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- isClosed = true;
- FileIOUtils.deleteFileOrDirectory(localFile);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index c35bda970..4869055ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -23,7 +23,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.lookup.LookupStoreWriter;
@@ -57,12 +56,12 @@ import static
org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Provide lookup by key. */
-public class LookupLevels implements Levels.DropFileCallback, Closeable {
+public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
private final RowCompactedSerializer keySerializer;
- private final RowCompactedSerializer valueSerializer;
+ private final ValueProcessor<T> valueProcessor;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
private final Supplier<File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
@@ -73,7 +72,7 @@ public class LookupLevels implements Levels.DropFileCallback,
Closeable {
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
- RowType valueType,
+ ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
@@ -83,7 +82,7 @@ public class LookupLevels implements Levels.DropFileCallback,
Closeable {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
- this.valueSerializer = new RowCompactedSerializer(valueType);
+ this.valueProcessor = valueProcessor;
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
@@ -114,23 +113,22 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
}
@Nullable
- public KeyValue lookup(InternalRow key, int startLevel) throws IOException
{
+ public T lookup(InternalRow key, int startLevel) throws IOException {
return LookupUtils.lookup(levels, key, startLevel, this::lookup,
this::lookupLevel0);
}
@Nullable
- private KeyValue lookupLevel0(InternalRow key, TreeSet<DataFileMeta>
level0)
- throws IOException {
+ private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0)
throws IOException {
return LookupUtils.lookupLevel0(keyComparator, key, level0,
this::lookup);
}
@Nullable
- private KeyValue lookup(InternalRow key, SortedRun level) throws
IOException {
+ private T lookup(InternalRow key, SortedRun level) throws IOException {
return LookupUtils.lookup(keyComparator, key, level, this::lookup);
}
@Nullable
- private KeyValue lookup(InternalRow key, DataFileMeta file) throws
IOException {
+ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
while (lookupFile == null || lookupFile.isClosed) {
@@ -143,13 +141,8 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
if (valueBytes == null) {
return null;
}
- InternalRow value = valueSerializer.deserialize(valueBytes);
- long sequenceNumber =
-
MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 9);
- RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length -
1]);
- return new KeyValue()
- .replace(key, sequenceNumber, rowKind, value)
- .setLevel(lookupFile.remoteFile().level());
+
+ return valueProcessor.readFromDisk(key,
lookupFile.remoteFile().level(), valueBytes);
}
private int fileWeigh(String file, LookupFile lookupFile) {
@@ -175,17 +168,12 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
lookupStoreFactory.createWriter(localFile,
bfGenerator.apply(file.rowCount()));
LookupStoreFactory.Context context;
try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
- DataOutputSerializer valueOut = new DataOutputSerializer(32);
RecordReader.RecordIterator<KeyValue> batch;
KeyValue kv;
while ((batch = reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
- valueOut.clear();
-
valueOut.write(valueSerializer.serializeToBytes(kv.value()));
- valueOut.writeLong(kv.sequenceNumber());
- valueOut.writeByte(kv.valueKind().toByteValue());
- byte[] valueBytes = valueOut.getCopyOfBuffer();
+ byte[] valueBytes = valueProcessor.persistToDisk(kv);
kvWriter.put(keyBytes, valueBytes);
}
batch.releaseBatch();
@@ -236,4 +224,57 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
FileIOUtils.deleteFileOrDirectory(localFile);
}
}
+
+ /** Processor to process value. */
+ public interface ValueProcessor<T> {
+
+ byte[] persistToDisk(KeyValue kv);
+
+ T readFromDisk(InternalRow key, int level, byte[] valueBytes);
+ }
+
+ /** A {@link ValueProcessor} to return {@link KeyValue}. */
+ public static class KeyValueProcessor implements ValueProcessor<KeyValue> {
+
+ private final RowCompactedSerializer valueSerializer;
+
+ public KeyValueProcessor(RowType valueType) {
+ this.valueSerializer = new RowCompactedSerializer(valueType);
+ }
+
+ @Override
+ public byte[] persistToDisk(KeyValue kv) {
+ byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
+ byte[] bytes = new byte[vBytes.length + 8 + 1];
+ MemorySegment segment = MemorySegment.wrap(bytes);
+ segment.put(0, vBytes);
+ segment.putLong(bytes.length - 9, kv.sequenceNumber());
+ segment.put(bytes.length - 1, kv.valueKind().toByteValue());
+ return bytes;
+ }
+
+ @Override
+ public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes)
{
+ InternalRow value = valueSerializer.deserialize(bytes);
+ long sequenceNumber =
MemorySegment.wrap(bytes).getLong(bytes.length - 9);
+ RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
+ return new KeyValue().replace(key, sequenceNumber, rowKind,
value).setLevel(level);
+ }
+ }
+
+ /** A {@link ValueProcessor} to return {@link Boolean} only. */
+ public static class ContainsValueProcessor implements
ValueProcessor<Boolean> {
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ @Override
+ public byte[] persistToDisk(KeyValue kv) {
+ return EMPTY_BYTES;
+ }
+
+ @Override
+ public Boolean readFromDisk(InternalRow key, int level, byte[] bytes) {
+ return Boolean.TRUE;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index c57087eea..f056fafaa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
@@ -45,8 +44,6 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
protected final int maxLevel;
protected final MergeEngine mergeEngine;
- protected final RecordEqualiser valueEqualiser;
- protected final boolean changelogRowDeduplicate;
public ChangelogMergeTreeRewriter(
int maxLevel,
@@ -55,14 +52,10 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- MergeSorter mergeSorter,
- RecordEqualiser valueEqualiser,
- boolean changelogRowDeduplicate) {
+ MergeSorter mergeSorter) {
super(readerFactory, writerFactory, keyComparator, mfFactory,
mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
- this.valueEqualiser = valueEqualiser;
- this.changelogRowDeduplicate = changelogRowDeduplicate;
}
protected abstract boolean rewriteChangelog(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
deleted file mode 100644
index 24f872da1..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.mergetree.compact;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.KeyValue;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.codegen.RecordEqualiser;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.KeyValueFileReaderFactory;
-import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.mergetree.ContainsLevels;
-import org.apache.paimon.mergetree.MergeSorter;
-import org.apache.paimon.mergetree.SortedRun;
-import org.apache.paimon.utils.Filter;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Comparator;
-import java.util.List;
-
-import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
-import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
-import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/**
- * A {@link MergeTreeCompactRewriter} for first row merge engine which
produces changelog files by
- * contains for the compaction involving level 0 files.
- */
-public class FirstRowMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
-
- private final ContainsLevels containsLevels;
-
- public FirstRowMergeTreeCompactRewriter(
- int maxLevel,
- CoreOptions.MergeEngine mergeEngine,
- ContainsLevels containsLevels,
- KeyValueFileReaderFactory readerFactory,
- KeyValueFileWriterFactory writerFactory,
- Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory,
- MergeSorter mergeSorter,
- RecordEqualiser valueEqualiser,
- boolean changelogRowDeduplicate) {
- super(
- maxLevel,
- mergeEngine,
- readerFactory,
- writerFactory,
- keyComparator,
- mfFactory,
- mergeSorter,
- valueEqualiser,
- changelogRowDeduplicate);
- this.containsLevels = containsLevels;
- }
-
- @Override
- protected boolean rewriteChangelog(
- int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
- return rewriteLookupChangelog(outputLevel, sections);
- }
-
- @Override
- protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta
file) {
- if (file.level() != 0) {
- return NO_CHANGELOG;
- }
-
- if (outputLevel == maxLevel) {
- return CHANGELOG_NO_REWRITE;
- }
-
- // FIRST_ROW must rewrite file, because some records that are already
at higher level may be
- // skipped
- // See LookupMergeFunction, it just returns newly records.
- return CHANGELOG_WITH_REWRITE;
- }
-
- @Override
- protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
- return new FistRowMergeFunctionWrapper(
- mfFactory,
- key -> {
- try {
- return containsLevels.contains(key, outputLevel + 1);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- }
-
- @Override
- public void close() throws IOException {
- containsLevels.close();
- }
-
- @VisibleForTesting
- static class FistRowMergeFunctionWrapper implements
MergeFunctionWrapper<ChangelogResult> {
-
- private final Filter<InternalRow> contains;
- private final FirstRowMergeFunction mergeFunction;
- private final ChangelogResult reusedResult = new ChangelogResult();
-
- public FistRowMergeFunctionWrapper(
- MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Filter<InternalRow> contains) {
- this.contains = contains;
- MergeFunction<KeyValue> mergeFunction =
mergeFunctionFactory.create();
- checkArgument(
- mergeFunction instanceof FirstRowMergeFunction,
- "Merge function should be a FirstRowMergeFunction, but is
%s, there is a bug.",
- mergeFunction.getClass().getName());
- this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
- }
-
- @Override
- public void reset() {
- mergeFunction.reset();
- }
-
- @Override
- public void add(KeyValue kv) {
- mergeFunction.add(kv);
- }
-
- @Override
- public ChangelogResult getResult() {
- reusedResult.reset();
- KeyValue result = mergeFunction.getResult();
- if (result == null) {
- return reusedResult;
- }
-
- if (contains.test(result.key())) {
- // empty
- return reusedResult;
- }
-
- reusedResult.setResult(result);
- if (result.level() == 0) {
- // new record, output changelog
- return reusedResult.addChangelog(result);
- }
- return reusedResult;
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
new file mode 100644
index 000000000..fe96658c9
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FistRowMergeFunctionWrapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.Filter;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Wrapper for {@link MergeFunction}s to produce changelog by lookup for
first row. */
+public class FistRowMergeFunctionWrapper implements
MergeFunctionWrapper<ChangelogResult> {
+
+ private final Filter<InternalRow> contains;
+ private final FirstRowMergeFunction mergeFunction;
+ private final ChangelogResult reusedResult = new ChangelogResult();
+
+ public FistRowMergeFunctionWrapper(
+ MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Filter<InternalRow> contains) {
+ this.contains = contains;
+ MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
+ checkArgument(
+ mergeFunction instanceof FirstRowMergeFunction,
+ "Merge function should be a FirstRowMergeFunction, but is %s,
there is a bug.",
+ mergeFunction.getClass().getName());
+ this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
+ }
+
+ @Override
+ public void reset() {
+ mergeFunction.reset();
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ mergeFunction.add(kv);
+ }
+
+ @Override
+ public ChangelogResult getResult() {
+ reusedResult.reset();
+ KeyValue result = mergeFunction.getResult();
+ if (result == null) {
+ return reusedResult;
+ }
+
+ if (contains.test(result.key())) {
+ // empty
+ return reusedResult;
+ }
+
+ reusedResult.setResult(result);
+ if (result.level() == 0) {
+ // new record, output changelog
+ return reusedResult.addChangelog(result);
+ }
+ return reusedResult;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index b8d64fa86..553b0ff5d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -39,6 +39,9 @@ import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.Upg
/** A {@link MergeTreeCompactRewriter} which produces changelog files for each
full compaction. */
public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
+ private final RecordEqualiser valueEqualiser;
+ private final boolean changelogRowDeduplicate;
+
public FullChangelogMergeTreeCompactRewriter(
int maxLevel,
CoreOptions.MergeEngine mergeEngine,
@@ -47,7 +50,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
- RecordEqualiser valueComparator,
+ RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
super(
maxLevel,
@@ -56,9 +59,9 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
writerFactory,
keyComparator,
mfFactory,
- mergeSorter,
- valueComparator,
- changelogRowDeduplicate);
+ mergeSorter);
+ this.valueEqualiser = valueEqualiser;
+ this.changelogRowDeduplicate = changelogRowDeduplicate;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index c78ded5ff..b88c26b99 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -42,21 +42,21 @@ import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.Upg
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup
for the compaction
* involving level 0 files.
*/
-public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter
{
+public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewriter {
- private final LookupLevels lookupLevels;
+ private final LookupLevels<T> lookupLevels;
+ private final MergeFunctionWrapperFactory<T> wrapperFactory;
public LookupMergeTreeCompactRewriter(
int maxLevel,
MergeEngine mergeEngine,
- LookupLevels lookupLevels,
+ LookupLevels<T> lookupLevels,
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
- RecordEqualiser valueEqualiser,
- boolean changelogRowDeduplicate) {
+ MergeFunctionWrapperFactory<T> wrapperFactory) {
super(
maxLevel,
mergeEngine,
@@ -64,10 +64,9 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
writerFactory,
keyComparator,
mfFactory,
- mergeSorter,
- valueEqualiser,
- changelogRowDeduplicate);
+ mergeSorter);
this.lookupLevels = lookupLevels;
+ this.wrapperFactory = wrapperFactory;
}
@Override
@@ -100,21 +99,73 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
- return new LookupChangelogMergeFunctionWrapper(
- mfFactory,
- key -> {
- try {
- return lookupLevels.lookup(key, outputLevel + 1);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- },
- valueEqualiser,
- changelogRowDeduplicate);
+ return wrapperFactory.create(mfFactory, outputLevel, lookupLevels);
}
@Override
public void close() throws IOException {
lookupLevels.close();
}
+
+ /** Factory to create {@link MergeFunctionWrapper}. */
+ public interface MergeFunctionWrapperFactory<T> {
+
+ MergeFunctionWrapper<ChangelogResult> create(
+ MergeFunctionFactory<KeyValue> mfFactory,
+ int outputLevel,
+ LookupLevels<T> lookupLevels);
+ }
+
+ /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper.
*/
+ public static class LookupMergeFunctionWrapperFactory
+ implements MergeFunctionWrapperFactory<KeyValue> {
+
+ private final RecordEqualiser valueEqualiser;
+ private final boolean changelogRowDeduplicate;
+
+ public LookupMergeFunctionWrapperFactory(
+ RecordEqualiser valueEqualiser, boolean
changelogRowDeduplicate) {
+ this.valueEqualiser = valueEqualiser;
+ this.changelogRowDeduplicate = changelogRowDeduplicate;
+ }
+
+ @Override
+ public MergeFunctionWrapper<ChangelogResult> create(
+ MergeFunctionFactory<KeyValue> mfFactory,
+ int outputLevel,
+ LookupLevels<KeyValue> lookupLevels) {
+ return new LookupChangelogMergeFunctionWrapper(
+ mfFactory,
+ key -> {
+ try {
+ return lookupLevels.lookup(key, outputLevel + 1);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ },
+ valueEqualiser,
+ changelogRowDeduplicate);
+ }
+ }
+
+ /** A {@link MergeFunctionWrapperFactory} for first row. */
+ public static class FirstRowMergeFunctionWrapperFactory
+ implements MergeFunctionWrapperFactory<Boolean> {
+
+ @Override
+ public MergeFunctionWrapper<ChangelogResult> create(
+ MergeFunctionFactory<KeyValue> mfFactory,
+ int outputLevel,
+ LookupLevels<Boolean> lookupLevels) {
+ return new FistRowMergeFunctionWrapper(
+ mfFactory,
+ key -> {
+ try {
+ return lookupLevels.lookup(key, outputLevel + 1)
!= null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 3073dc8d9..6a18a34dc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,17 +35,19 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
-import org.apache.paimon.mergetree.ContainsLevels;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
+import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
-import org.apache.paimon.mergetree.compact.FirstRowMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
+import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
+import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
@@ -243,74 +245,51 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
.copyWithoutProjection()
.withValueProjection(new int[0][])
.build(partition, bucket);
- ContainsLevels containsLevels =
createContainsLevels(levels, keyOnlyReader);
- return new FirstRowMergeTreeCompactRewriter(
+ return new LookupMergeTreeCompactRewriter<>(
maxLevel,
mergeEngine,
- containsLevels,
+ createLookupLevels(levels, new
ContainsValueProcessor(), keyOnlyReader),
readerFactory,
writerFactory,
keyComparator,
mfFactory,
mergeSorter,
- valueEqualiserSupplier.get(),
- options.changelogRowDeduplicate());
+ new FirstRowMergeFunctionWrapperFactory());
+ } else {
+ return new LookupMergeTreeCompactRewriter<>(
+ maxLevel,
+ mergeEngine,
+ createLookupLevels(
+ levels, new KeyValueProcessor(valueType),
readerFactory),
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ mfFactory,
+ mergeSorter,
+ new LookupMergeFunctionWrapperFactory(
+ valueEqualiserSupplier.get(),
+ options.changelogRowDeduplicate()));
}
- LookupLevels lookupLevels = createLookupLevels(levels,
readerFactory);
- return new LookupMergeTreeCompactRewriter(
- maxLevel,
- mergeEngine,
- lookupLevels,
- readerFactory,
- writerFactory,
- keyComparator,
- mfFactory,
- mergeSorter,
- valueEqualiserSupplier.get(),
- options.changelogRowDeduplicate());
default:
return new MergeTreeCompactRewriter(
readerFactory, writerFactory, keyComparator,
mfFactory, mergeSorter);
}
}
- private LookupLevels createLookupLevels(
- Levels levels, KeyValueFileReaderFactory readerFactory) {
- if (ioManager == null) {
- throw new RuntimeException(
- "Can not use lookup, there is no temp disk directory to
use.");
- }
- Options options = this.options.toConfiguration();
- return new LookupLevels(
- levels,
- keyComparatorSupplier.get(),
- keyType,
- valueType,
- file ->
- readerFactory.createRecordReader(
- file.schemaId(), file.fileName(),
file.fileSize(), file.level()),
- () -> ioManager.createChannel().getPathFile(),
- new HashLookupStoreFactory(
- cacheManager,
- this.options.cachePageSize(),
- options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
- options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
- options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
- bfGenerator(options));
- }
-
- private ContainsLevels createContainsLevels(
- Levels levels, KeyValueFileReaderFactory readerFactory) {
+ private <T> LookupLevels<T> createLookupLevels(
+ Levels levels,
+ LookupLevels.ValueProcessor<T> valueProcessor,
+ KeyValueFileReaderFactory readerFactory) {
if (ioManager == null) {
throw new RuntimeException(
"Can not use lookup, there is no temp disk directory to
use.");
}
Options options = this.options.toConfiguration();
- return new ContainsLevels(
+ return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
keyType,
+ valueProcessor,
file ->
readerFactory.createRecordReader(
file.schemaId(), file.fileName(),
file.fileSize(), file.level()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 425d3d621..9f008eb39 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -53,7 +53,7 @@ import static
org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
/** Implementation for {@link TableQuery} for caching data and file in local.
*/
public class LocalTableQuery implements TableQuery {
- private final Map<BinaryRow, Map<Integer, LookupLevels>> tableView;
+ private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>>
tableView;
private final CoreOptions options;
@@ -125,12 +125,13 @@ public class LocalTableQuery implements TableQuery {
Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles,
options.numLevels());
KeyValueFileReaderFactory factory =
readerFactoryBuilder.build(partition, bucket);
Options options = this.options.toConfiguration();
- LookupLevels lookupLevels =
- new LookupLevels(
+ LookupLevels<KeyValue> lookupLevels =
+ new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
readerFactoryBuilder.keyType(),
- readerFactoryBuilder.projectedValueType(),
+ new LookupLevels.KeyValueProcessor(
+ readerFactoryBuilder.projectedValueType()),
file ->
factory.createRecordReader(
file.schemaId(),
@@ -154,11 +155,11 @@ public class LocalTableQuery implements TableQuery {
@Override
public synchronized InternalRow lookup(BinaryRow partition, int bucket,
InternalRow key)
throws IOException {
- Map<Integer, LookupLevels> buckets = tableView.get(partition);
+ Map<Integer, LookupLevels<KeyValue>> buckets =
tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
}
- LookupLevels lookupLevels = buckets.get(bucket);
+ LookupLevels<KeyValue> lookupLevels = buckets.get(bucket);
if (lookupLevels == null) {
return null;
}
@@ -189,8 +190,10 @@ public class LocalTableQuery implements TableQuery {
@Override
public void close() throws IOException {
- for (Map.Entry<BinaryRow, Map<Integer, LookupLevels>> buckets :
tableView.entrySet()) {
- for (Map.Entry<Integer, LookupLevels> bucket :
buckets.getValue().entrySet()) {
+ for (Map.Entry<BinaryRow, Map<Integer, LookupLevels<KeyValue>>>
buckets :
+ tableView.entrySet()) {
+ for (Map.Entry<Integer, LookupLevels<KeyValue>> bucket :
+ buckets.getValue().entrySet()) {
bucket.getValue().close();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 6db0abc90..631a6e47f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -64,7 +64,7 @@ import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test {@link ContainsLevels}. */
+/** Test {@link LookupLevels} for contains. */
public class ContainsLevelsTest {
private static final String LOOKUP_FILE_PREFIX = "lookup-";
@@ -88,22 +88,23 @@ public class ContainsLevelsTest {
newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
newFile(2, kv(2, 22), kv(5, 55))),
3);
- ContainsLevels containsLevels = createContainsLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<Boolean> containsLevels =
+ createContainsLevels(levels, MemorySize.ofMebiBytes(10));
// only in level 1
- assertThat(containsLevels.contains(row(1), 1)).isTrue();
+ assertThat(containsLevels.lookup(row(1), 1)).isTrue();
// only in level 2
- assertThat(containsLevels.contains(row(2), 1)).isTrue();
+ assertThat(containsLevels.lookup(row(2), 1)).isTrue();
// both in level 1 and level 2
- assertThat(containsLevels.contains(row(5), 1)).isTrue();
+ assertThat(containsLevels.lookup(row(5), 1)).isTrue();
// no exists
- assertThat(containsLevels.contains(row(4), 1)).isFalse();
+ assertThat(containsLevels.lookup(row(4), 1)).isNull();
containsLevels.close();
-
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
+ assertThat(containsLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
@Test
@@ -117,7 +118,8 @@ public class ContainsLevelsTest {
newFile(1, kv(7, 77), kv(8, 88)),
newFile(1, kv(10, 1010), kv(11, 1111))),
1);
- ContainsLevels containsLevels = createContainsLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<Boolean> containsLevels =
+ createContainsLevels(levels, MemorySize.ofMebiBytes(10));
Map<Integer, Integer> contains =
new HashMap<Integer, Integer>() {
@@ -133,16 +135,16 @@ public class ContainsLevelsTest {
}
};
for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
- assertThat(containsLevels.contains(row(entry.getKey()),
1)).isTrue();
+ assertThat(containsLevels.lookup(row(entry.getKey()), 1)).isTrue();
}
int[] notContains = new int[] {0, 3, 6, 9, 12};
for (int key : notContains) {
- assertThat(containsLevels.contains(row(key), 1)).isFalse();
+ assertThat(containsLevels.lookup(row(key), 1)).isNull();
}
containsLevels.close();
-
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
+ assertThat(containsLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
@Test
@@ -159,28 +161,30 @@ public class ContainsLevelsTest {
files.add(newFile(1, kvs.toArray(new KeyValue[0])));
}
Levels levels = new Levels(comparator, files, 1);
- ContainsLevels lookupLevels = createContainsLevels(levels,
MemorySize.ofKibiBytes(60));
+ LookupLevels<Boolean> lookupLevels =
+ createContainsLevels(levels, MemorySize.ofKibiBytes(60));
for (int i = 0; i < fileNum * recordInFile; i++) {
- assertThat(lookupLevels.contains(row(i), 1)).isTrue();
+ assertThat(lookupLevels.lookup(row(i), 1)).isTrue();
}
// some files are invalided
- long fileNumber = lookupLevels.containsFiles().estimatedSize();
+ long fileNumber = lookupLevels.lookupFiles().estimatedSize();
String[] lookupFiles =
tempDir.toFile().list((dir, name) ->
name.startsWith(LOOKUP_FILE_PREFIX));
assertThat(lookupFiles).isNotNull();
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
lookupLevels.close();
- assertThat(lookupLevels.containsFiles().estimatedSize()).isEqualTo(0);
+ assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
- private ContainsLevels createContainsLevels(Levels levels, MemorySize
maxDiskSize) {
- return new ContainsLevels(
+ private LookupLevels<Boolean> createContainsLevels(Levels levels,
MemorySize maxDiskSize) {
+ return new LookupLevels<>(
levels,
comparator,
keyType,
+ new LookupLevels.ContainsValueProcessor(),
file ->
createReaderFactory()
.createRecordReader(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 4dc7504e5..e7df8fa8d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -89,7 +89,8 @@ public class LookupLevelsTest {
newFile(1, kv(1, 11, 1), kv(3, 33, 2), kv(5,
5, 3)),
newFile(2, kv(2, 22, 4), kv(5, 55, 5))),
3);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<KeyValue> lookupLevels =
+ createLookupLevels(levels, MemorySize.ofMebiBytes(10));
// only in level 1
KeyValue kv = lookupLevels.lookup(row(1), 1);
@@ -131,7 +132,8 @@ public class LookupLevelsTest {
newFile(1, kv(7, 77), kv(8, 88)),
newFile(1, kv(10, 1010), kv(11, 1111))),
1);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<KeyValue> lookupLevels =
+ createLookupLevels(levels, MemorySize.ofMebiBytes(10));
Map<Integer, Integer> contains =
new HashMap<Integer, Integer>() {
@@ -178,7 +180,8 @@ public class LookupLevelsTest {
files.add(newFile(1, kvs.toArray(new KeyValue[0])));
}
Levels levels = new Levels(comparator, files, 1);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofKibiBytes(20));
+ LookupLevels<KeyValue> lookupLevels =
+ createLookupLevels(levels, MemorySize.ofKibiBytes(20));
for (int i = 0; i < fileNum * recordInFile; i++) {
KeyValue kv = lookupLevels.lookup(row(i), 1);
@@ -209,7 +212,8 @@ public class LookupLevelsTest {
// empty level 2
newFile(3, kv(2, 22), kv(5, 55))),
3);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<KeyValue> lookupLevels =
+ createLookupLevels(levels, MemorySize.ofMebiBytes(10));
KeyValue kv = lookupLevels.lookup(row(2), 1);
assertThat(kv).isNotNull();
@@ -225,7 +229,8 @@ public class LookupLevelsTest {
newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
newFile(2, kv(2, 22), kv(5, 55))),
3);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ LookupLevels<KeyValue> lookupLevels =
+ createLookupLevels(levels, MemorySize.ofMebiBytes(10));
KeyValue kv = lookupLevels.lookup(row(1), 0);
assertThat(kv).isNotNull();
@@ -250,12 +255,12 @@ public class LookupLevelsTest {
assertThat(kv.value().getInt(1)).isEqualTo(11);
}
- private LookupLevels createLookupLevels(Levels levels, MemorySize
maxDiskSize) {
- return new LookupLevels(
+ private LookupLevels<KeyValue> createLookupLevels(Levels levels,
MemorySize maxDiskSize) {
+ return new LookupLevels<>(
levels,
comparator,
keyType,
- rowType,
+ new LookupLevels.KeyValueProcessor(rowType),
file ->
createReaderFactory()
.createRecordReader(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index df51eecff..7564ca07d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -296,8 +296,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
@Test
public void testFirstRow() {
Set<InternalRow> highLevel = new HashSet<>();
- FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper function =
- new
FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper(
+ FistRowMergeFunctionWrapper function =
+ new FistRowMergeFunctionWrapper(
projection ->
new FirstRowMergeFunction(
new RowType(