JingsongLi commented on code in PR #2958:
URL: https://github.com/apache/incubator-paimon/pull/2958#discussion_r1515497048
##########
paimon-common/src/main/java/org/apache/paimon/CoreOptions.java:
##########
@@ -1371,6 +1380,48 @@ public ChangelogProducer changelogProducer() {
return options.get(CHANGELOG_PRODUCER);
}
+ public boolean requireLookup() {
+ return !lookupStrategy().equals(LookupStrategy.NO_LOOKUP);
+ }
+
+ public LookupStrategy lookupStrategy() {
+ return LookupStrategy.from(
+
options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP),
+ deletionVectorsEnabled());
+ }
+
+ /** Strategy for lookup. */
+ public enum LookupStrategy {
+ NO_LOOKUP(false, false),
+ CHANGELOG_ONLY(true, false),
+ DELETION_VECTOR_ONLY(false, true),
+ CHANGELOG_AND_DELETION_VECTOR(true, true);
+
+ // write changelog
+ public final boolean changelog;
+
+ // write deletion vector
+ public final boolean deletionVector;
+
+ LookupStrategy(boolean changelog, boolean deletionVector) {
Review Comment:
boolean needLookup, boolean produceChangelog, boolean deletionVector
##########
paimon-common/src/main/java/org/apache/paimon/CoreOptions.java:
##########
@@ -1371,6 +1380,48 @@ public ChangelogProducer changelogProducer() {
return options.get(CHANGELOG_PRODUCER);
}
+ public boolean requireLookup() {
+ return !lookupStrategy().equals(LookupStrategy.NO_LOOKUP);
+ }
+
+ public LookupStrategy lookupStrategy() {
Review Comment:
move this class and LookupStrategy to separate class.
`CoreOptions` is a public api, these are internal classes.
##########
paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java:
##########
@@ -230,7 +238,15 @@ public interface ValueProcessor<T> {
byte[] persistToDisk(KeyValue kv);
- T readFromDisk(InternalRow key, int level, byte[] valueBytes);
+ default boolean withPosition() {
Review Comment:
It is better to not use `default` method.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java:
##########
@@ -102,8 +103,9 @@ private StoreSinkWrite.Provider createWriteProvider(
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
waitCompaction =
- changelogProducer == ChangelogProducer.LOOKUP
- && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+ (changelogProducer == ChangelogProducer.LOOKUP
+ &&
options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
+ || options.get(DELETION_VECTORS_ENABLED);
Review Comment:
If `CHANGELOG_PRODUCER_LOOKUP_WAIT` is true, can we just not wait?
##########
paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java:
##########
@@ -228,68 +256,78 @@ private MergeTreeCompactRewriter createRewriter(
int bucket,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
- Levels levels) {
+ Levels levels,
+ @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ if (deletionVectorsMaintainer != null) {
+ readerFactoryBuilder.withDeletionVectorSupplier(
+ deletionVectorsMaintainer::deletionVectorOf);
+ }
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType,
ioManager);
int maxLevel = options.numLevels() - 1;
- CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- switch (options.changelogProducer()) {
- case FULL_COMPACTION:
- return new FullChangelogMergeTreeCompactRewriter(
- maxLevel,
- mergeEngine,
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter,
- valueEqualiserSupplier.get(),
- options.changelogRowDeduplicate());
- case LOOKUP:
- if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
- KeyValueFileReaderFactory keyOnlyReader =
- readerFactoryBuilder
- .copyWithoutProjection()
- .withValueProjection(new int[0][])
- .build(partition, bucket);
- return new LookupMergeTreeCompactRewriter<>(
- maxLevel,
- mergeEngine,
- createLookupLevels(levels, new
ContainsValueProcessor(), keyOnlyReader),
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter,
- new FirstRowMergeFunctionWrapperFactory());
- } else {
- return new LookupMergeTreeCompactRewriter<>(
- maxLevel,
- mergeEngine,
- createLookupLevels(
- levels, new KeyValueProcessor(valueType),
readerFactory),
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter,
- new LookupMergeFunctionWrapperFactory(
- valueEqualiserSupplier.get(),
- options.changelogRowDeduplicate()));
- }
- default:
- return new MergeTreeCompactRewriter(
- readerFactory,
- writerFactory,
- keyComparator,
- userDefinedSeqComparator,
- mfFactory,
- mergeSorter);
+ MergeEngine mergeEngine = options.mergeEngine();
+ ChangelogProducer changelogProducer = options.changelogProducer();
+ if (changelogProducer.equals(FULL_COMPACTION)) {
+ return new FullChangelogMergeTreeCompactRewriter(
+ maxLevel,
+ mergeEngine,
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ userDefinedSeqComparator,
+ mfFactory,
+ mergeSorter,
+ valueEqualiserSupplier.get(),
+ options.changelogRowDeduplicate());
+ } else if (options.requireLookup()) {
+ LookupStrategy lookupStrategy = options.lookupStrategy();
+ LookupLevels.ValueProcessor<?> processor;
+ LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?>
wrapperFactory;
+ KeyValueFileReaderFactory lookupReaderFactory = readerFactory;
+ if (mergeEngine == FIRST_ROW) {
+ lookupReaderFactory =
Review Comment:
here should throw unsupported exception? for deletion.vectors.
##########
paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java:
##########
@@ -173,7 +175,13 @@ private LookupFile createLookupFile(DataFileMeta file)
throws IOException {
while ((batch = reader.readBatch()) != null) {
while ((kv = batch.next()) != null) {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
- byte[] valueBytes = valueProcessor.persistToDisk(kv);
+ byte[] valueBytes =
+ valueProcessor.withPosition()
Review Comment:
It is better to do a big if else for performance, you can wrap while in the
if else.
##########
paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java:
##########
@@ -254,7 +270,7 @@ public byte[] persistToDisk(KeyValue kv) {
}
@Override
- public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes)
{
+ public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes,
String ignore) {
Review Comment:
ignore -> fileName
##########
paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java:
##########
@@ -273,8 +289,93 @@ public byte[] persistToDisk(KeyValue kv) {
}
@Override
- public Boolean readFromDisk(InternalRow key, int level, byte[] bytes) {
+ public Boolean readFromDisk(InternalRow key, int level, byte[] bytes,
String ignore) {
Review Comment:
ignore -> fileName
##########
paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordWithPositionIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link RecordReader} which apply {@link DeletionVector} to filter
record. */
+public class ApplyDeletionVectorReader<T> implements RecordReader<T> {
+
+ private final RecordReader<T> reader;
+
+ private final DeletionVector deletionVector;
+
+ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector
deletionVector) {
+ this.reader = reader;
+ this.deletionVector = deletionVector;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ RecordIterator<T> batch = reader.readBatch();
+
+ if (batch == null) {
+ return null;
+ }
+
+ checkArgument(
+ batch instanceof RecordWithPositionIterator,
+ "There is a bug, RecordIterator in ApplyDeletionVectorReader
must be RecordWithPositionIterator instead of "
Review Comment:
Please avoid concat string here.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java:
##########
@@ -102,8 +103,9 @@ private StoreSinkWrite.Provider createWriteProvider(
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
waitCompaction =
- changelogProducer == ChangelogProducer.LOOKUP
- && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+ (changelogProducer == ChangelogProducer.LOOKUP
+ &&
options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
+ || options.get(DELETION_VECTORS_ENABLED);
Review Comment:
Can you add itcase for Flink?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]