JingsongLi commented on code in PR #2605: URL: https://github.com/apache/incubator-paimon/pull/2605#discussion_r1439294641
########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLRULookupTable.java: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.lookup; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormatDiscover; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.KeyValueFileReaderFactory; +import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.lookup.hash.HashLookupStoreFactory; +import org.apache.paimon.mergetree.Levels; +import org.apache.paimon.mergetree.LookupLevels; +import org.apache.paimon.schema.KeyValueFieldsExtractor; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PrimaryKeyTableUtils; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.KeyComparatorSupplier; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.Projection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE; + +/** Lookup table for primary key which supports to read the LSM tree directly. */ +public class PrimaryKeyLRULookupTable implements LookupTable { + + private static final Logger LOG = LoggerFactory.getLogger(PrimaryKeyLRULookupTable.class); + + private final Map<BinaryRow, Map<Integer, LookupLevels>> tableView; + + private final CoreOptions options; + + private final FixedBucketKeyExtractor extractor; + + private final Supplier<Comparator<InternalRow>> keyComparatorSupplier; + + private final RowType keyType; + + private final RowType projectedValueType; + + private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; + + private final File path; + + private final HashLookupStoreFactory hashLookupStoreFactory; + + public PrimaryKeyLRULookupTable(FileStoreTable table, int[] projection, File path) { + this.path = path; + this.options = new CoreOptions(table.options()); + this.tableView = new HashMap<>(); + this.extractor = new FixedBucketKeyExtractor(table.schema()); + KeyValueFieldsExtractor pkExtractor = + PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR; + this.keyType = new RowType(pkExtractor.keyFields(table.schema())); + CacheManager cacheManager = + new CacheManager( + options.pageSize(), + options.toConfiguration().get(LOOKUP_CACHE_MAX_MEMORY_SIZE)); + RowType partitionType = table.schema().logicalPartitionType(); + this.readerFactoryBuilder = + KeyValueFileReaderFactory.builder( + table.fileIO(), + new SchemaManager(table.fileIO(), table.location()), + table.schema().id(), + keyType, + table.rowType(), + FileFormatDiscover.of(options), + new FileStorePathFactory( + options.path(), + partitionType, + options.partitionDefaultName(), + options.fileFormat().getFormatIdentifier()), + PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR, + options); + readerFactoryBuilder.withValueProjection(Projection.of(projection).toNestedIndexes()); + this.projectedValueType = readerFactoryBuilder.projectedValueType(); + this.keyComparatorSupplier = new KeyComparatorSupplier(keyType); + this.hashLookupStoreFactory = + new HashLookupStoreFactory( + cacheManager, + options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)); + } + + @Override + public List<InternalRow> get(InternalRow key) throws IOException { + LookupLevels lookupLevels = getLookupLevels(key); + if (lookupLevels == null) { + return Collections.emptyList(); + } else { + // lookup start from level 1. + KeyValue kv = lookupLevels.lookup(key, 1); + if (kv == null || kv.valueKind().isRetract()) { + return Collections.emptyList(); + } else { + return Collections.singletonList(kv.value()); + } + } + } + + private LookupLevels getLookupLevels(InternalRow key) { + extractor.setRecord(key); + int bucket = extractor.bucket(); + + if (tableView.isEmpty()) { + return null; + } + // Non partitioned table + Map<Integer, LookupLevels> buckets = tableView.entrySet().iterator().next().getValue(); + return buckets.get(bucket); + } + + @Override + public void refresh(List<Split> splits) { + for (Split split : splits) { + if (!(split instanceof DataSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + BinaryRow partition = ((DataSplit) split).partition(); + int bucket = ((DataSplit) split).bucket(); + List<DataFileMeta> before = ((DataSplit) split).beforeFiles(); + List<DataFileMeta> after = ((DataSplit) split).dataFiles(); + + Map<Integer, LookupLevels> buckets = + tableView.computeIfAbsent(partition, k -> new HashMap<>()); + LookupLevels lookupLevels = buckets.get(bucket); + + if (lookupLevels == null) { + Preconditions.checkArgument( + before.isEmpty(), "The before file should be empty for the initial phase."); + Levels levels = new Levels(keyComparatorSupplier.get(), after, options.numLevels()); + + KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket); + + lookupLevels = + new LookupLevels( Review Comment: Can you add an interface for lookup? For example: 1. Add `FileStoreTable.newQuery(@Nullable int[] projection)`. 2. `PrimaryKeyFileStoreTable` override `newQuery`. ``` public interface TableQuery extends Closeable { void refreshFiles( BinaryRow partition, int bucket, List<DataFileMeta> beforeFiles, List<DataFileMeta> dataFiles); @Nullable InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; } ``` This can be a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
