This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8aaa687e1 [core] Optimize First row merge engine with lookup (#1644)
8aaa687e1 is described below
commit 8aaa687e1d97e99d173dca9f50b630b7914f5422
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 26 10:06:03 2023 +0800
[core] Optimize First row merge engine with lookup (#1644)
---
docs/content/concepts/primary-key-table.md | 13 +-
.../org/apache/paimon/io/cache/CacheManager.java | 23 +--
.../org/apache/paimon/utils/BiFunctionWithIOE.java | 35 ++++
.../paimon/io/KeyValueFileReaderFactory.java | 12 ++
.../apache/paimon/mergetree/ContainsLevels.java | 194 +++++++++++++++++++++
.../org/apache/paimon/mergetree/LookupLevels.java | 96 +++-------
.../org/apache/paimon/mergetree/LookupUtils.java | 103 +++++++++++
.../compact/ChangelogMergeTreeRewriter.java | 17 ++
....java => FirstRowMergeTreeCompactRewriter.java} | 92 +++++++---
.../compact/FullChangelogMergeFunctionWrapper.java | 6 -
.../LookupChangelogMergeFunctionWrapper.java | 10 +-
.../mergetree/compact/LookupMergeFunction.java | 11 +-
.../compact/LookupMergeTreeCompactRewriter.java | 15 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 40 +++++
.../org/apache/paimon/schema/SchemaValidation.java | 24 ++-
...okupLevelsTest.java => ContainsLevelsTest.java} | 66 +++----
.../apache/paimon/mergetree/LookupLevelsTest.java | 8 +-
.../FullChangelogMergeFunctionWrapperTestBase.java | 122 +------------
.../LookupChangelogMergeFunctionWrapperTest.java | 38 ++--
.../mergetree/compact/SortMergeReaderTestBase.java | 7 +-
.../mergetree/compact/UniversalCompactionTest.java | 26 +++
.../paimon/flink/lookup/RocksDBListState.java | 32 ++--
.../apache/paimon/flink/lookup/RocksDBState.java | 11 +-
.../org/apache/paimon/flink/FirstRowITCase.java | 71 +++-----
.../apache/paimon/flink/ReadWriteTableITCase.java | 18 ++
25 files changed, 675 insertions(+), 415 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 868497995..d79bd0718 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -216,15 +216,12 @@ For streaming queries, `aggregation` merge engine must be
used together with `lo
### First Row
-By specifying `'merge-engine' = 'first-row'`, users can keep the first row of
the same primary key. It differs from the `deduplicate` merge engine that in
the `first-row` merge engine, it will generate insert only changelog.
+By specifying `'merge-engine' = 'first-row'`, users can keep the first row of
the same primary key. It differs from the
+`deduplicate` merge engine that in the `first-row` merge engine, it will
generate insert only changelog.
-{{< hint info >}}
-For streaming queries, `first-row` merge engine must be used together with
`lookup` or `full-compaction` [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}).
-{{< /hint >}}
-
-{{< hint info >}}
-Currently, only the first row of insert order supported, so you can not
specify `sequence.field` for this merge engine. And also not accept `DELETE`
and `UPDATE_BEFORE` message.
-{{< /hint>}}
+1. `first-row` merge engine must be used together with `lookup` [changelog
producer]({{< ref "concepts/primary-key-table#changelog-producers" >}}).
+2. You can not specify `sequence.field`.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message.
## Changelog Producers
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 92d9bf296..d1eece7bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -61,17 +61,15 @@ public class CacheManager {
public MemorySegment getPage(
RandomAccessFile file, int pageNumber, Consumer<Integer>
cleanCallback) {
CacheKey key = new CacheKey(file, pageNumber);
- CacheValue value;
- value =
- cache.get(
- key,
- cacheKey -> {
- try {
- return createValue(key, cleanCallback);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ CacheValue value = cache.getIfPresent(key);
+ while (value == null || value.isClosed) {
+ try {
+ value = createValue(key, cleanCallback);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ cache.put(key, value);
+ }
return value.segment;
}
@@ -84,6 +82,7 @@ public class CacheManager {
}
private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause)
{
+ value.isClosed = true;
value.cleanCallback.accept(key.pageNumber);
}
@@ -135,6 +134,8 @@ public class CacheManager {
private final MemorySegment segment;
private final Consumer<Integer> cleanCallback;
+ private boolean isClosed = false;
+
private CacheValue(MemorySegment segment, Consumer<Integer>
cleanCallback) {
this.segment = segment;
this.cleanCallback = cleanCallback;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java
new file mode 100644
index 000000000..bd5f074d6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BiFunctionWithIOE.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+
+/** A bi function with {@link IOException}. */
+@FunctionalInterface
+public interface BiFunctionWithIOE<T, U, R> {
+
+ /**
+ * Applies this function to the given arguments.
+ *
+ * @param t the first function argument
+ * @param u the second function argument
+ * @return the function result
+ */
+ R apply(T t, U u) throws IOException;
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index e9413715f..71fbbec9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -157,6 +157,18 @@ public class KeyValueFileReaderFactory {
applyProjection();
}
+ public Builder copyWithoutProjection() {
+ return new Builder(
+ fileIO,
+ schemaManager,
+ schemaId,
+ keyType,
+ valueType,
+ formatDiscover,
+ pathFactory,
+ extractor);
+ }
+
public Builder withKeyProjection(int[][] projection) {
keyProjection = projection;
applyProjection();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
new file mode 100644
index 000000000..953f7a85f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/ContainsLevels.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.IOFunction;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Provide contains key. */
+public class ContainsLevels implements Levels.DropFileCallback, Closeable {
+
+ private static final byte[] EMPTY_VALUE = new byte[0];
+
+ private final Levels levels;
+ private final Comparator<InternalRow> keyComparator;
+ private final RowCompactedSerializer keySerializer;
+ private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
+ private final Supplier<File> localFileFactory;
+ private final LookupStoreFactory lookupStoreFactory;
+
+ private final Cache<String, ContainsFile> containsFiles;
+
+ public ContainsLevels(
+ Levels levels,
+ Comparator<InternalRow> keyComparator,
+ RowType keyType,
+ IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
+ Supplier<File> localFileFactory,
+ LookupStoreFactory lookupStoreFactory,
+ Duration fileRetention,
+ MemorySize maxDiskSize) {
+ this.levels = levels;
+ this.keyComparator = keyComparator;
+ this.keySerializer = new RowCompactedSerializer(keyType);
+ this.fileReaderFactory = fileReaderFactory;
+ this.localFileFactory = localFileFactory;
+ this.lookupStoreFactory = lookupStoreFactory;
+ this.containsFiles =
+ Caffeine.newBuilder()
+ .expireAfterAccess(fileRetention)
+ .maximumWeight(maxDiskSize.getKibiBytes())
+ .weigher(this::fileWeigh)
+ .removalListener(this::removalCallback)
+ .executor(MoreExecutors.directExecutor())
+ .build();
+ levels.addDropFileCallback(this);
+ }
+
+ @VisibleForTesting
+ Cache<String, ContainsFile> containsFiles() {
+ return containsFiles;
+ }
+
+ @Override
+ public void notifyDropFile(String file) {
+ containsFiles.invalidate(file);
+ }
+
+ public boolean contains(InternalRow key, int startLevel) throws
IOException {
+ Boolean result = LookupUtils.lookup(levels, key, startLevel,
this::contains);
+ return result != null && result;
+ }
+
+ @Nullable
+ private Boolean contains(InternalRow key, SortedRun level) throws
IOException {
+ return LookupUtils.lookup(keyComparator, key, level, this::contains);
+ }
+
+ @Nullable
+ private Boolean contains(InternalRow key, DataFileMeta file) throws
IOException {
+ ContainsFile containsFile =
containsFiles.getIfPresent(file.fileName());
+ while (containsFile == null || containsFile.isClosed) {
+ containsFile = createContainsFile(file);
+ containsFiles.put(file.fileName(), containsFile);
+ }
+ if (containsFile.get(keySerializer.serializeToBytes(key)) != null) {
+ return true;
+ }
+ return null;
+ }
+
+ private int fileWeigh(String file, ContainsFile containsFile) {
+ return fileKibiBytes(containsFile.localFile);
+ }
+
+ private void removalCallback(String key, ContainsFile file, RemovalCause
cause) {
+ if (file != null) {
+ try {
+ file.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ private ContainsFile createContainsFile(DataFileMeta file) throws
IOException {
+ File localFile = localFileFactory.get();
+ if (!localFile.createNewFile()) {
+ throw new IOException("Can not create new file: " + localFile);
+ }
+ try (LookupStoreWriter kvWriter =
lookupStoreFactory.createWriter(localFile);
+ RecordReader<KeyValue> reader = fileReaderFactory.apply(file))
{
+ RecordReader.RecordIterator<KeyValue> batch;
+ KeyValue kv;
+ while ((batch = reader.readBatch()) != null) {
+ while ((kv = batch.next()) != null) {
+ byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
+ kvWriter.put(keyBytes, EMPTY_VALUE);
+ }
+ batch.releaseBatch();
+ }
+ } catch (IOException e) {
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ throw e;
+ }
+
+ return new ContainsFile(localFile,
lookupStoreFactory.createReader(localFile));
+ }
+
+ @Override
+ public void close() throws IOException {
+ containsFiles.invalidateAll();
+ }
+
+ private static class ContainsFile implements Closeable {
+
+ private final File localFile;
+ private final LookupStoreReader reader;
+
+ private boolean isClosed = false;
+
+ public ContainsFile(File localFile, LookupStoreReader reader) {
+ this.localFile = localFile;
+ this.reader = reader;
+ }
+
+ @Nullable
+ public byte[] get(byte[] key) throws IOException {
+ checkArgument(!isClosed);
+ return reader.lookup(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ isClosed = true;
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 444e86e95..4fee4b9d8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -35,9 +35,10 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;
-import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
-import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
-import
org.apache.paimon.shade.guava30.com.google.common.cache.RemovalNotification;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nullable;
@@ -47,10 +48,11 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
+import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Provide lookup by key. */
public class LookupLevels implements Levels.DropFileCallback, Closeable {
@@ -82,11 +84,12 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
this.lookupFiles =
- CacheBuilder.newBuilder()
+ Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher(this::fileWeigh)
.removalListener(this::removalCallback)
+ .executor(MoreExecutors.directExecutor())
.build();
levels.addDropFileCallback(this);
}
@@ -103,65 +106,22 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
@Nullable
public KeyValue lookup(InternalRow key, int startLevel) throws IOException
{
- if (startLevel == 0) {
- throw new IllegalArgumentException("Start level can not be zero.");
- }
-
- KeyValue kv = null;
- for (int i = startLevel; i < levels.numberOfLevels(); i++) {
- SortedRun level = levels.runOfLevel(i);
- kv = lookup(key, level);
- if (kv != null) {
- break;
- }
- }
-
- return kv;
+ return LookupUtils.lookup(levels, key, startLevel, this::lookup);
}
@Nullable
- private KeyValue lookup(InternalRow target, SortedRun level) throws
IOException {
- List<DataFileMeta> files = level.files();
- int left = 0;
- int right = files.size() - 1;
-
- // binary search restart positions to find the restart position
immediately before the
- // targetKey
- while (left < right) {
- int mid = (left + right) / 2;
-
- if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
- // Key at "mid.max" is < "target". Therefore all
- // files at or before "mid" are uninteresting.
- left = mid + 1;
- } else {
- // Key at "mid.max" is >= "target". Therefore all files
- // after "mid" are uninteresting.
- right = mid;
- }
- }
-
- int index = right;
-
- // if the index is now pointing to the last file, check if the largest
key in the block is
- // than the target key. If so, we need to seek beyond the end of this
file
- if (index == files.size() - 1
- && keyComparator.compare(files.get(index).maxKey(), target) <
0) {
- index++;
- }
-
- // if files does not have a next, it means the key does not exist in
this level
- return index < files.size() ? lookup(target, files.get(index)) : null;
+ private KeyValue lookup(InternalRow key, SortedRun level) throws
IOException {
+ return LookupUtils.lookup(keyComparator, key, level, this::lookup);
}
@Nullable
private KeyValue lookup(InternalRow key, DataFileMeta file) throws
IOException {
- LookupFile lookupFile;
- try {
- lookupFile = lookupFiles.get(file.fileName(), () ->
createLookupFile(file));
- } catch (ExecutionException e) {
- throw new IOException(e);
+ LookupFile lookupFile = lookupFiles.getIfPresent(file.fileName());
+ while (lookupFile == null || lookupFile.isClosed) {
+ lookupFile = createLookupFile(file);
+ lookupFiles.put(file.fileName(), lookupFile);
}
+
byte[] keyBytes = keySerializer.serializeToBytes(key);
byte[] valueBytes = lookupFile.get(keyBytes);
if (valueBytes == null) {
@@ -176,14 +136,13 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
}
private int fileWeigh(String file, LookupFile lookupFile) {
- return lookupFile.fileKibiBytes();
+ return fileKibiBytes(lookupFile.localFile);
}
- private void removalCallback(RemovalNotification<String, LookupFile>
notification) {
- LookupFile reader = notification.getValue();
- if (reader != null) {
+ private void removalCallback(String key, LookupFile file, RemovalCause
cause) {
+ if (file != null) {
try {
- reader.close();
+ file.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -231,6 +190,8 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
+ private boolean isClosed = false;
+
public LookupFile(File localFile, DataFileMeta remoteFile,
LookupStoreReader reader) {
this.localFile = localFile;
this.remoteFile = remoteFile;
@@ -239,18 +200,10 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
@Nullable
public byte[] get(byte[] key) throws IOException {
+ checkArgument(!isClosed);
return reader.lookup(key);
}
- public int fileKibiBytes() {
- long kibiBytes = localFile.length() >> 10;
- if (kibiBytes > Integer.MAX_VALUE) {
- throw new RuntimeException(
- "Lookup file is too big: " +
MemorySize.ofKibiBytes(kibiBytes));
- }
- return (int) kibiBytes;
- }
-
public DataFileMeta remoteFile() {
return remoteFile;
}
@@ -258,6 +211,7 @@ public class LookupLevels implements
Levels.DropFileCallback, Closeable {
@Override
public void close() throws IOException {
reader.close();
+ isClosed = true;
FileIOUtils.deleteFileOrDirectory(localFile);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java
new file mode 100644
index 000000000..977d7ab9f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.BiFunctionWithIOE;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+/** Utils for lookup. */
+public class LookupUtils {
+
+ public static <T> T lookup(
+ Levels levels,
+ InternalRow key,
+ int startLevel,
+ BiFunctionWithIOE<InternalRow, SortedRun, T> lookup)
+ throws IOException {
+ if (startLevel == 0) {
+ throw new IllegalArgumentException("Start level can not be zero.");
+ }
+
+ T result = null;
+ for (int i = startLevel; i < levels.numberOfLevels(); i++) {
+ SortedRun level = levels.runOfLevel(i);
+ result = lookup.apply(key, level);
+ if (result != null) {
+ break;
+ }
+ }
+
+ return result;
+ }
+
+ public static <T> T lookup(
+ Comparator<InternalRow> keyComparator,
+ InternalRow target,
+ SortedRun level,
+ BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup)
+ throws IOException {
+ List<DataFileMeta> files = level.files();
+ int left = 0;
+ int right = files.size() - 1;
+
+ // binary search restart positions to find the restart position
immediately before the
+ // targetKey
+ while (left < right) {
+ int mid = (left + right) / 2;
+
+ if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
+ // Key at "mid.max" is < "target". Therefore all
+ // files at or before "mid" are uninteresting.
+ left = mid + 1;
+ } else {
+ // Key at "mid.max" is >= "target". Therefore all files
+ // after "mid" are uninteresting.
+ right = mid;
+ }
+ }
+
+ int index = right;
+
+ // if the index is now pointing to the last file, check if the largest
key in the block is
+ // than the target key. If so, we need to seek beyond the end of this
file
+ if (index == files.size() - 1
+ && keyComparator.compare(files.get(index).maxKey(), target) <
0) {
+ index++;
+ }
+
+ // if files does not have a next, it means the key does not exist in
this level
+ return index < files.size() ? lookup.apply(target, files.get(index)) :
null;
+ }
+
+ public static int fileKibiBytes(File file) {
+ long kibiBytes = file.length() >> 10;
+ if (kibiBytes > Integer.MAX_VALUE) {
+ throw new RuntimeException(
+ "Lookup file is too big: " +
MemorySize.ofKibiBytes(kibiBytes));
+ }
+ return (int) kibiBytes;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 84d884483..efdca7944 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -62,6 +62,23 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
protected abstract MergeFunctionWrapper<ChangelogResult>
createMergeWrapper(int outputLevel);
+ protected boolean rewriteLookupChangelog(int outputLevel,
List<List<SortedRun>> sections) {
+ if (outputLevel == 0) {
+ return false;
+ }
+
+ for (List<SortedRun> runs : sections) {
+ for (SortedRun run : runs) {
+ for (DataFileMeta file : run.files()) {
+ if (file.level() == 0) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public CompactResult rewrite(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) throws Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
similarity index 51%
copy from
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
copy to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
index d0c710590..d79cef325 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeTreeCompactRewriter.java
@@ -19,30 +19,34 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.ContainsLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.Filter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/**
- * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup
for the compaction
- * involving level 0 files.
+ * A {@link MergeTreeCompactRewriter} for first row merge engine which
produces changelog files by
+ * contains for the compaction involving level 0 files.
*/
-public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter
{
+public class FirstRowMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
- private final LookupLevels lookupLevels;
+ private final ContainsLevels containsLevels;
- public LookupMergeTreeCompactRewriter(
- LookupLevels lookupLevels,
+ public FirstRowMergeTreeCompactRewriter(
+ ContainsLevels containsLevels,
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@@ -58,26 +62,13 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
mergeSorter,
valueEqualiser,
changelogRowDeduplicate);
- this.lookupLevels = lookupLevels;
+ this.containsLevels = containsLevels;
}
@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
- if (outputLevel == 0) {
- return false;
- }
-
- for (List<SortedRun> runs : sections) {
- for (SortedRun run : runs) {
- for (DataFileMeta file : run.files()) {
- if (file.level() == 0) {
- return true;
- }
- }
- }
- }
- return false;
+ return rewriteLookupChangelog(outputLevel, sections);
}
@Override
@@ -87,21 +78,66 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
- return new LookupChangelogMergeFunctionWrapper(
+ return new FistRowMergeFunctionWrapper(
mfFactory,
key -> {
try {
- return lookupLevels.lookup(key, outputLevel + 1);
+ return containsLevels.contains(key, outputLevel + 1);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- },
- valueEqualiser,
- changelogRowDeduplicate);
+ });
}
@Override
public void close() throws IOException {
- lookupLevels.close();
+ containsLevels.close();
+ }
+
+ @VisibleForTesting
+ static class FistRowMergeFunctionWrapper implements
MergeFunctionWrapper<ChangelogResult> {
+
+ private final Filter<InternalRow> contains;
+ private final FirstRowMergeFunction mergeFunction;
+ private final ChangelogResult reusedResult = new ChangelogResult();
+
+ public FistRowMergeFunctionWrapper(
+ MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Filter<InternalRow> contains) {
+ this.contains = contains;
+ MergeFunction<KeyValue> mergeFunction =
mergeFunctionFactory.create();
+ checkArgument(
+ mergeFunction instanceof FirstRowMergeFunction,
+ "Merge function should be a FirstRowMergeFunction, but is
%s, there is a bug.",
+ mergeFunction.getClass().getName());
+ this.mergeFunction = (FirstRowMergeFunction) mergeFunction;
+ }
+
+ @Override
+ public void reset() {
+ mergeFunction.reset();
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ mergeFunction.add(kv);
+ }
+
+ @Override
+ public ChangelogResult getResult() {
+ reusedResult.reset();
+ KeyValue result = mergeFunction.getResult();
+ checkArgument(result != null);
+ if (contains.test(result.key())) {
+ // empty
+ return reusedResult;
+ }
+
+ reusedResult.setResult(result);
+ if (result.level() == 0) {
+ // new record, output changelog
+ return reusedResult.addChangelog(result);
+ }
+ return reusedResult;
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index 86eb16698..0b1e41450 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -41,7 +41,6 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
private final int maxLevel;
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
- private final boolean isFirstRow;
// only full compaction will write files into maxLevel, see
UniversalCompaction class
private KeyValue topLevelKv;
@@ -62,7 +61,6 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
"Value count merge function does not need to produce changelog
from full compaction. "
+ "Please set changelog producer to 'input'.");
this.mergeFunction = mergeFunction;
- this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
this.maxLevel = maxLevel;
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
@@ -110,10 +108,6 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
reusedResult.addChangelog(replace(reusedAfter,
RowKind.INSERT, merged));
}
} else {
- // For first row, we should just return old value. And produce
no changelog.
- if (isFirstRow) {
- return reusedResult.setResultIfNotRetract(merged);
- }
if (merged == null || !isAdd(merged)) {
reusedResult.addChangelog(replace(reusedBefore,
RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index e862789b2..e405d33cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -53,7 +53,6 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
private final KeyValue reusedAfter = new KeyValue();
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
- private final boolean isFirstRow;
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -66,7 +65,6 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
"Merge function should be a LookupMergeFunction, but is %s,
there is a bug.",
mergeFunction.getClass().getName());
this.mergeFunction = (LookupMergeFunction) mergeFunction;
- this.isFirstRow = this.mergeFunction.isFirstRow;
this.mergeFunction2 = mergeFunctionFactory.create();
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
@@ -100,9 +98,7 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
// 2. With level 0, with the latest high level, return changelog
if (highLevel != null) {
// For first row, we should just return old value. And produce no
changelog.
- if (!isFirstRow) {
- setChangelog(highLevel, result);
- }
+ setChangelog(highLevel, result);
return reusedResult.setResult(result);
}
@@ -114,9 +110,7 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
mergeFunction2.add(highLevel);
mergeFunction2.add(result);
result = mergeFunction2.getResult();
- if (!isFirstRow) {
- setChangelog(highLevel, result);
- }
+ setChangelog(highLevel, result);
} else {
setChangelog(null, result);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index fe8056e4c..64fc78045 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -41,12 +41,10 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
KeyValue highLevel;
boolean containLevel0;
- protected final boolean isFirstRow;
public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType
valueType) {
this.mergeFunction = mergeFunction;
- this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
}
@@ -66,9 +64,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
- // For the first row, the candidates should in the highest level.
- Iterator<KeyValue> descending =
- isFirstRow ? candidates.iterator() :
candidates.descendingIterator();
+ Iterator<KeyValue> descending = candidates.descendingIterator();
while (descending.hasNext()) {
KeyValue kv = descending.next();
if (kv.level() > 0) {
@@ -90,6 +86,11 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
public static MergeFunctionFactory<KeyValue> wrap(
MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType
valueType) {
+ if (wrapped.create() instanceof FirstRowMergeFunction) {
+ // don't wrap first row, it is already OK
+ return wrapped;
+ }
+
return new Factory(wrapped, keyType, valueType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index d0c710590..e5a13b1c6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -64,20 +64,7 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
- if (outputLevel == 0) {
- return false;
- }
-
- for (List<SortedRun> runs : sections) {
- for (SortedRun run : runs) {
- for (DataFileMeta file : run.files()) {
- if (file.level() == 0) {
- return true;
- }
- }
- }
- }
- return false;
+ return rewriteLookupChangelog(outputLevel, sections);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 176914738..ffd7c6081 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -35,12 +35,14 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.mergetree.ContainsLevels;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.FirstRowMergeTreeCompactRewriter;
import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupCompaction;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
@@ -218,6 +220,23 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
valueEqualiserSupplier.get(),
options.changelogRowDeduplicate());
case LOOKUP:
+ if (options.mergeEngine() ==
CoreOptions.MergeEngine.FIRST_ROW) {
+ KeyValueFileReaderFactory keyOnlyReader =
+ readerFactoryBuilder
+ .copyWithoutProjection()
+ .withValueProjection(new int[0][])
+ .build(partition, bucket);
+ ContainsLevels containsLevels =
createContainsLevels(levels, keyOnlyReader);
+ return new FirstRowMergeTreeCompactRewriter(
+ containsLevels,
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ mfFactory,
+ mergeSorter,
+ valueEqualiserSupplier.get(),
+ options.changelogRowDeduplicate());
+ }
LookupLevels lookupLevels = createLookupLevels(levels,
readerFactory);
return new LookupMergeTreeCompactRewriter(
lookupLevels,
@@ -255,4 +274,25 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}
+
+ private ContainsLevels createContainsLevels(
+ Levels levels, KeyValueFileReaderFactory readerFactory) {
+ if (ioManager == null) {
+ throw new RuntimeException(
+ "Can not use lookup, there is no temp disk directory to
use.");
+ }
+ return new ContainsLevels(
+ levels,
+ keyComparatorSupplier.get(),
+ keyType,
+ file ->
+ readerFactory.createRecordReader(
+ file.schemaId(), file.fileName(),
file.level()),
+ () -> ioManager.createChannel().getPathFile(),
+ new HashLookupStoreFactory(
+ cacheManager,
+
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 72ea13649..37f779d35 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -19,6 +19,7 @@
package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.WriteMode;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
@@ -82,8 +83,9 @@ public class SchemaValidation {
validateStartupMode(options);
+ ChangelogProducer changelogProducer = options.changelogProducer();
if (options.writeMode() == WriteMode.APPEND_ONLY
- && options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE) {
+ && changelogProducer != ChangelogProducer.NONE) {
throw new UnsupportedOperationException(
String.format(
"Can not set the %s to %s and %s at the same
time.",
@@ -100,16 +102,15 @@ public class SchemaValidation {
+ SNAPSHOT_NUM_RETAINED_MAX.key());
// Only changelog tables with primary keys support full compaction or
lookup
- // changelog
- // producer
+ // changelog producer
if (options.writeMode() == WriteMode.CHANGE_LOG) {
- switch (options.changelogProducer()) {
+ switch (changelogProducer) {
case FULL_COMPACTION:
case LOOKUP:
if (schema.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
"Changelog table with "
- + options.changelogProducer()
+ + changelogProducer
+ " must have primary keys");
}
break;
@@ -173,9 +174,16 @@ public class SchemaValidation {
field));
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW &&
sequenceField.isPresent()) {
- throw new IllegalArgumentException(
- "Do not support use sequence field on FIRST_MERGE merge
engine");
+ if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
+ if (sequenceField.isPresent()) {
+ throw new IllegalArgumentException(
+ "Do not support use sequence field on FIRST_MERGE
merge engine");
+ }
+
+ if (changelogProducer != ChangelogProducer.LOOKUP) {
+ throw new IllegalArgumentException(
+ "Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
+ }
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
similarity index 78%
copy from
paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
copy to
paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 079ebce31..424fe9315 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -60,12 +60,11 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
-import static org.apache.paimon.KeyValue.UNKNOWN_SEQUENCE;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test {@link LookupLevels}. */
-public class LookupLevelsTest {
+/** Test {@link ContainsLevels}. */
+public class ContainsLevelsTest {
private static final String LOOKUP_FILE_PREFIX = "lookup-";
@@ -88,35 +87,22 @@ public class LookupLevelsTest {
newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
newFile(2, kv(2, 22), kv(5, 55))),
3);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ ContainsLevels containsLevels = createContainsLevels(levels,
MemorySize.ofMebiBytes(10));
// only in level 1
- KeyValue kv = lookupLevels.lookup(row(1), 1);
- assertThat(kv).isNotNull();
- assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
- assertThat(kv.level()).isEqualTo(1);
- assertThat(kv.value().getInt(1)).isEqualTo(11);
+ assertThat(containsLevels.contains(row(1), 1)).isTrue();
// only in level 2
- kv = lookupLevels.lookup(row(2), 1);
- assertThat(kv).isNotNull();
- assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
- assertThat(kv.level()).isEqualTo(2);
- assertThat(kv.value().getInt(1)).isEqualTo(22);
+ assertThat(containsLevels.contains(row(2), 1)).isTrue();
// both in level 1 and level 2
- kv = lookupLevels.lookup(row(5), 1);
- assertThat(kv).isNotNull();
- assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
- assertThat(kv.level()).isEqualTo(1);
- assertThat(kv.value().getInt(1)).isEqualTo(5);
+ assertThat(containsLevels.contains(row(5), 1)).isTrue();
// no exists
- kv = lookupLevels.lookup(row(4), 1);
- assertThat(kv).isNull();
+ assertThat(containsLevels.contains(row(4), 1)).isFalse();
- lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ containsLevels.close();
+
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
}
@Test
@@ -130,7 +116,7 @@ public class LookupLevelsTest {
newFile(1, kv(7, 77), kv(8, 88)),
newFile(1, kv(10, 1010), kv(11, 1111))),
1);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+ ContainsLevels containsLevels = createContainsLevels(levels,
MemorySize.ofMebiBytes(10));
Map<Integer, Integer> contains =
new HashMap<Integer, Integer>() {
@@ -146,28 +132,23 @@ public class LookupLevelsTest {
}
};
for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
- KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
- assertThat(kv).isNotNull();
- assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
- assertThat(kv.level()).isEqualTo(1);
- assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
+ assertThat(containsLevels.contains(row(entry.getKey()),
1)).isTrue();
}
int[] notContains = new int[] {0, 3, 6, 9, 12};
for (int key : notContains) {
- KeyValue kv = lookupLevels.lookup(row(key), 1);
- assertThat(kv).isNull();
+ assertThat(containsLevels.contains(row(key), 1)).isFalse();
}
- lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ containsLevels.close();
+
assertThat(containsLevels.containsFiles().estimatedSize()).isEqualTo(0);
}
@Test
public void testMaxDiskSize() throws IOException {
List<DataFileMeta> files = new ArrayList<>();
int fileNum = 10;
- int recordInFile = 100;
+ int recordInFile = 1000;
for (int i = 0; i < fileNum; i++) {
List<KeyValue> kvs = new ArrayList<>();
for (int j = 0; j < recordInFile; j++) {
@@ -177,33 +158,28 @@ public class LookupLevelsTest {
files.add(newFile(1, kvs.toArray(new KeyValue[0])));
}
Levels levels = new Levels(comparator, files, 1);
- LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofKibiBytes(20));
+ ContainsLevels lookupLevels = createContainsLevels(levels,
MemorySize.ofKibiBytes(50));
for (int i = 0; i < fileNum * recordInFile; i++) {
- KeyValue kv = lookupLevels.lookup(row(i), 1);
- assertThat(kv).isNotNull();
- assertThat(kv.sequenceNumber()).isEqualTo(UNKNOWN_SEQUENCE);
- assertThat(kv.level()).isEqualTo(1);
- assertThat(kv.value().getInt(1)).isEqualTo(i);
+ assertThat(lookupLevels.contains(row(i), 1)).isTrue();
}
// some files are invalided
- long fileNumber = lookupLevels.lookupFiles().size();
+ long fileNumber = lookupLevels.containsFiles().estimatedSize();
String[] lookupFiles =
tempDir.toFile().list((dir, name) ->
name.startsWith(LOOKUP_FILE_PREFIX));
assertThat(lookupFiles).isNotNull();
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ assertThat(lookupLevels.containsFiles().estimatedSize()).isEqualTo(0);
}
- private LookupLevels createLookupLevels(Levels levels, MemorySize
maxDiskSize) {
- return new LookupLevels(
+ private ContainsLevels createContainsLevels(Levels levels, MemorySize
maxDiskSize) {
+ return new ContainsLevels(
levels,
comparator,
keyType,
- rowType,
file -> createReaderFactory().createRecordReader(0,
file.fileName(), file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(new CacheManager(2048,
MemorySize.ofMebiBytes(1)), 0.75),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 079ebce31..ef77b75e0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -116,7 +116,7 @@ public class LookupLevelsTest {
assertThat(kv).isNull();
lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
@Test
@@ -160,7 +160,7 @@ public class LookupLevelsTest {
}
lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
@Test
@@ -188,14 +188,14 @@ public class LookupLevelsTest {
}
// some files are invalided
- long fileNumber = lookupLevels.lookupFiles().size();
+ long fileNumber = lookupLevels.lookupFiles().estimatedSize();
String[] lookupFiles =
tempDir.toFile().list((dir, name) ->
name.startsWith(LOOKUP_FILE_PREFIX));
assertThat(lookupFiles).isNotNull();
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
lookupLevels.close();
- assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}
private LookupLevels createLookupLevels(Levels levels, MemorySize
maxDiskSize) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index bddd37111..70e361f87 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -20,9 +20,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -40,7 +38,7 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
private static final int MAX_LEVEL = 3;
private static final RecordEqualiser EQUALISER =
- (RecordEqualiser) (row1, row2) -> row1.getInt(0) == row2.getInt(0);
+ (row1, row2) -> row1.getInt(0) == row2.getInt(0);
protected FullChangelogMergeFunctionWrapper wrapper;
@@ -48,10 +46,6 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
protected abstract boolean changelogRowDeduplicate();
- protected List<List<KeyValue>> getInputKvs() {
- return INPUT_KVS;
- }
-
@BeforeEach
public void beforeEach() {
wrapper =
@@ -113,10 +107,9 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
@Test
public void testFullChangelogMergeFunctionWrapper() {
- List<List<KeyValue>> inputs = getInputKvs();
- for (int i = 0; i < inputs.size(); i++) {
+ for (int i = 0; i < INPUT_KVS.size(); i++) {
wrapper.reset();
- List<KeyValue> kvs = inputs.get(i);
+ List<KeyValue> kvs = INPUT_KVS.get(i);
kvs.forEach(kv -> wrapper.add(kv));
ChangelogResult actualResult = wrapper.getResult();
List<KeyValue> expectedChangelogs = new ArrayList<>();
@@ -222,113 +215,4 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
return true;
}
}
-
- /** Test for {@link FirstRowMergeFunction} with {@link
FullChangelogMergeFunctionWrapper}. */
- public static class FirstRowMergeFunctionTest
- extends FullChangelogMergeFunctionWrapperTestBase {
-
- private static final List<List<KeyValue>> INPUT_KVS =
- Arrays.asList(
- // only 1 insert record, not from top level
- Collections.singletonList(
- new KeyValue()
- .replace(row(1), 1, RowKind.INSERT,
row(1))
- .setLevel(0)),
- // only 1 delete record, not from top level
- Collections.singletonList(
- new KeyValue()
- .replace(row(2), 2, RowKind.DELETE,
row(0))
- .setLevel(0)),
- // only 1 insert record, from top level
- Collections.singletonList(
- new KeyValue()
- .replace(row(3), 3, RowKind.INSERT,
row(3))
- .setLevel(MAX_LEVEL)),
- // multiple records, none from top level
- Arrays.asList(
- new KeyValue()
- .replace(row(4), 4, RowKind.INSERT,
row(3))
- .setLevel(0),
- new KeyValue()
- .replace(row(4), 5, RowKind.INSERT,
row(-3))
- .setLevel(0)),
- // multiple records, one from top level
- Arrays.asList(
- new KeyValue()
- .replace(row(6), 8, RowKind.INSERT,
row(3))
- .setLevel(MAX_LEVEL),
- new KeyValue()
- .replace(row(6), 9, RowKind.INSERT,
row(-3))
- .setLevel(0)),
- Arrays.asList(
- new KeyValue()
- .replace(row(8), 14, RowKind.INSERT,
row(3))
- .setLevel(MAX_LEVEL),
- new KeyValue()
- .replace(row(8), 15, RowKind.INSERT,
row(3))
- .setLevel(0)));
-
- @Override
- protected List<List<KeyValue>> getInputKvs() {
- return INPUT_KVS;
- }
-
- private final List<KeyValue> expectedBefore =
- Arrays.asList(
- null,
- null,
- null,
- null,
- null,
- changelogRowDeduplicate()
- ? null
- : new KeyValue()
- .replace(row(8), 14,
RowKind.UPDATE_BEFORE, row(3)));
-
- private final List<KeyValue> expectedAfter =
- Arrays.asList(
- new KeyValue().replace(row(1), 1, RowKind.INSERT,
row(1)),
- null,
- null,
- new KeyValue().replace(row(4), 4, RowKind.INSERT,
row(3)),
- null,
- changelogRowDeduplicate()
- ? null
- : new KeyValue().replace(row(8), 15,
RowKind.UPDATE_AFTER, row(3)));
-
- private final List<KeyValue> expectedResult =
- Arrays.asList(
- new KeyValue().replace(row(1), 1, RowKind.INSERT,
row(1)),
- null,
- new KeyValue().replace(row(3), 3, RowKind.INSERT,
row(3)),
- new KeyValue().replace(row(4), 4, RowKind.INSERT,
row(3)),
- new KeyValue().replace(row(6), 8, RowKind.INSERT,
row(3)),
- new KeyValue().replace(row(8), 14, RowKind.INSERT,
row(3)));
-
- @Override
- protected MergeFunction<KeyValue> createMergeFunction() {
- return new FirstRowMergeFunction(
- RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT()));
- }
-
- @Override
- protected boolean changelogRowDeduplicate() {
- return true;
- }
-
- @Override
- protected KeyValue getExpectedBefore(int idx) {
- return expectedBefore.get(idx);
- }
-
- @Override
- protected KeyValue getExpectedAfter(int idx) {
- return expectedAfter.get(idx);
- }
-
- @Override
- protected KeyValue getExpectedResult(int idx) {
- return expectedResult.get(idx);
- }
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 892446da6..5cf976313 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -36,8 +36,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.types.RowKind.DELETE;
@@ -50,7 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class LookupChangelogMergeFunctionWrapperTest {
private static final RecordEqualiser EQUALISER =
- (RecordEqualiser) (row1, row2) -> row1.getInt(0) == row2.getInt(0);
+ (row1, row2) -> row1.getInt(0) == row2.getInt(0);
@ParameterizedTest
@ValueSource(booleans = {false, true})
@@ -290,25 +292,16 @@ public class LookupChangelogMergeFunctionWrapperTest {
@Test
public void testFirstRow() {
- Map<InternalRow, KeyValue> highLevel = new HashMap<>();
- LookupChangelogMergeFunctionWrapper function =
- new LookupChangelogMergeFunctionWrapper(
- LookupMergeFunction.wrap(
- projection ->
- new FirstRowMergeFunction(
- new RowType(
- Lists.list(
- new DataField(
- 0,
"f0", new IntType()))),
- new RowType(
- Lists.list(
- new DataField(
- 1,
"f1", new IntType())))),
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.INT())),
- highLevel::get,
- EQUALISER,
- false);
+ Set<InternalRow> highLevel = new HashSet<>();
+ FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper function =
+ new
FirstRowMergeTreeCompactRewriter.FistRowMergeFunctionWrapper(
+ projection ->
+ new FirstRowMergeFunction(
+ new RowType(
+ Lists.list(new DataField(0,
"f0", new IntType()))),
+ new RowType(
+ Lists.list(new DataField(1,
"f1", new IntType())))),
+ highLevel::contains);
// Without level-0
function.reset();
@@ -361,7 +354,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
// with high level value
function.reset();
- highLevel.put(row(1), new KeyValue().replace(row(1), INSERT, row(10)));
+ highLevel.add(row(1));
function.add(new KeyValue().replace(row(1), 2, INSERT,
row(0)).setLevel(0));
result = function.getResult();
@@ -369,7 +362,6 @@ public class LookupChangelogMergeFunctionWrapperTest {
changelogs = result.changelogs();
assertThat(changelogs).hasSize(0);
kv = result.result();
- assertThat(kv).isNotNull();
- assertThat(kv.value().getInt(0)).isEqualTo(10);
+ assertThat(kv).isNull();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index a12c5d152..ea4607214 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -160,9 +160,10 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
- return new FirstRowMergeFunction(
- new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
- new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))));
+ RowType keyType = new RowType(Lists.list(new DataField(0, "f0",
new IntType())));
+ RowType valueType = new RowType(Lists.list(new DataField(1, "f1",
new BigIntType())));
+ return new LookupMergeFunction(
+ new FirstRowMergeFunction(keyType, valueType), keyType,
valueType);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index d549954a7..4c4020343 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -195,6 +195,32 @@ public class UniversalCompactionTest {
.isEqualTo(new long[] {27});
}
+ @Test
+ public void testLookup() {
+ LookupCompaction compaction = new LookupCompaction(new
UniversalCompaction(25, 1, 3));
+
+ // level 0 to max level
+ Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 2, 2));
+ assertThat(pick.isPresent()).isTrue();
+ long[] results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 2, 2, 2});
+ assertThat(pick.get().outputLevel()).isEqualTo(2);
+
+ // level 0 force pick
+ pick = compaction.pick(3, Arrays.asList(level(0, 1), level(1, 2),
level(2, 2)));
+ assertThat(pick.isPresent()).isTrue();
+ results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 2, 2});
+ assertThat(pick.get().outputLevel()).isEqualTo(2);
+
+ // level 0 to empty level
+ pick = compaction.pick(3, Arrays.asList(level(0, 1), level(2, 2)));
+ assertThat(pick.isPresent()).isTrue();
+ results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1});
+ assertThat(pick.get().outputLevel()).isEqualTo(1);
+ }
+
private List<LevelSortedRun> createLevels(int... levels) {
List<LevelSortedRun> runs = new ArrayList<>();
for (int size : levels) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
index e1e7a7efe..f2ee7ee43 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java
@@ -28,7 +28,6 @@ import org.rocksdb.RocksDBException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/** RocksDB state for key -> List of value. */
public class RocksDBListState extends RocksDBState<List<InternalRow>> {
@@ -59,21 +58,22 @@ public class RocksDBListState extends
RocksDBState<List<InternalRow>> {
public List<InternalRow> get(InternalRow key) throws IOException {
byte[] keyBytes = serializeKey(key);
- try {
- return cache.get(
- wrap(keyBytes),
- () -> {
- byte[] valueBytes = db.get(columnFamily, keyBytes);
- List<InternalRow> rows =
- listSerializer.deserializeList(valueBytes,
valueSerializer);
- if (rows == null) {
- return EMPTY;
- }
- return rows;
- });
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
+ return cache.get(
+ wrap(keyBytes),
+ k -> {
+ byte[] valueBytes;
+ try {
+ valueBytes = db.get(columnFamily, keyBytes);
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ }
+ List<InternalRow> rows =
+ listSerializer.deserializeList(valueBytes,
valueSerializer);
+ if (rows == null) {
+ return EMPTY;
+ }
+ return rows;
+ });
}
private byte[] serializeValue(InternalRow value) throws IOException {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 0e47b6c38..7269d1234 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -23,8 +23,9 @@ import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
-import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
-import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
@@ -70,7 +71,11 @@ public abstract class RocksDBState<CacheV> {
this.valueInputView = new DataInputDeserializer();
this.valueOutputView = new DataOutputSerializer(32);
this.writeOptions = new WriteOptions().setDisableWAL(true);
- this.cache =
CacheBuilder.newBuilder().maximumSize(lruCacheSize).build();
+ this.cache =
+ Caffeine.newBuilder()
+ .maximumSize(lruCacheSize)
+ .executor(MoreExecutors.directExecutor())
+ .build();
}
protected byte[] serializeKey(InternalRow key) throws IOException {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index cd625b5ac..12af57fea 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -26,81 +26,66 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for first row merge engine. */
public class FirstRowITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
- return Arrays.asList(
+ return Collections.singletonList(
"CREATE TABLE IF NOT EXISTS T ("
+ "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
- + " WITH ('merge-engine'='first-row');",
- "CREATE TABLE IF NOT EXISTS T1 ("
- + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
- + " WITH ('merge-engine'='first-row',
'changelog-producer' = 'lookup');",
- "CREATE TABLE IF NOT EXISTS T2 ("
- + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
- + " WITH ('merge-engine'='first-row',
'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' =
'3');");
+ + " WITH ('merge-engine'='first-row',
'file.format'='avro', 'changelog-producer' = 'lookup');");
}
@Test
- public void testBatchQuery() {
- batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
- List<Row> result = batchSql("SELECT * FROM T");
-
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1,
"1"));
-
- result = batchSql("SELECT c FROM T");
-
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
+ public void testIllegal() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE ILLEGAL_T (a INT, b INT,
c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH
('merge-engine'='first-row')"))
+ .hasRootCauseMessage(
+ "Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
}
@Test
- public void testReadAfterFullCompaction() {
- batchSql("ALTER TABLE T SET ('full-compaction.delta-commits'='1')");
-
+ public void testBatchQuery() {
batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
List<Row> result = batchSql("SELECT * FROM T");
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1,
"1"));
- batchSql("INSERT INTO T VALUES (1, 1, '1'), (2, 2, '2')");
- result = batchSql("SELECT * FROM T");
- assertThat(result)
- .containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1, 1, "1"),
- Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+ result = batchSql("SELECT c FROM T");
+
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
}
@Test
- public void testStreamingReadOnFullCompaction() throws Exception {
- BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T2");
+ public void testStreamingRead() throws Exception {
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T");
- sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1,
4, '4')");
+ sql("INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 4,
'4')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 1, "1"),
Row.ofKind(RowKind.INSERT, 2, 2, "2"));
- sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3,
3, '3')");
- assertThat(iterator.collect(1))
- .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3,
"3"));
- }
-
- @Test
- public void testStreamingReadOnLookup() throws Exception {
- BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T1");
-
- sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1,
4, '4')");
- assertThat(iterator.collect(2))
+ sql(
+ "INSERT INTO T VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'),
(3, 3, '3'), (4, 4, '4'), (5, 5, '5'), (6, 6, '6'), (7, 7, '7')");
+ assertThat(iterator.collect(5))
.containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1, 1, "1"),
- Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+ Row.ofKind(RowKind.INSERT, 3, 3, "3"),
+ Row.ofKind(RowKind.INSERT, 4, 4, "4"),
+ Row.ofKind(RowKind.INSERT, 5, 5, "5"),
+ Row.ofKind(RowKind.INSERT, 6, 6, "6"),
+ Row.ofKind(RowKind.INSERT, 7, 7, "7"));
- sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3,
3, '3')");
+ sql("INSERT INTO T VALUES(7, 7, '8'), (8, 8, '8')");
assertThat(iterator.collect(1))
- .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3,
"3"));
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 8, 8,
"8"));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 0c7636efb..d89f5860e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -72,8 +72,11 @@ import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1482,6 +1485,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ if (mergeEngine == FIRST_ROW) {
+ options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+ }
String table =
createTable(
Arrays.asList(
@@ -1576,6 +1582,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
options.put(
CoreOptions.FIELDS_PREFIX + ".rate." +
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ if (mergeEngine == FIRST_ROW) {
+ options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+ }
String table =
createTable(
Arrays.asList(
@@ -1656,6 +1665,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ if (mergeEngine == FIRST_ROW) {
+ options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+ }
String table =
createTable(
Arrays.asList(
@@ -1746,6 +1758,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ if (mergeEngine == FIRST_ROW) {
+ options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+ }
String table =
createTable(
Arrays.asList(
@@ -1824,6 +1839,9 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ if (mergeEngine == FIRST_ROW) {
+ options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
+ }
String table =
createTable(
Arrays.asList(