This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new 997fdc795 [core] Move codes of cross partition to paimon-core (#2186) 997fdc795 is described below commit 997fdc795d3705b174619214f0a0d48a48bd98c0 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Oct 27 18:00:37 2023 +0800 [core] Move codes of cross partition to paimon-core (#2186) --- .../org/apache/paimon/utils/KeyValueIterator.java | 31 +++++++++++ .../paimon/utils}/ListDelimitedSerializer.java | 2 +- .../apache/paimon/utils/ProjectToRowFunction.java | 8 +-- paimon-core/pom.xml | 11 ++++ .../crosspartition}/GlobalIndexAssigner.java | 19 +++---- .../paimon/crosspartition}/IndexBootstrap.java | 2 +- .../KeyPartPartitionKeyExtractor.java | 6 +- .../apache/paimon}/lookup/RocksDBListState.java | 3 +- .../org/apache/paimon/lookup}/RocksDBOptions.java | 2 +- .../org/apache/paimon}/lookup/RocksDBSetState.java | 2 +- .../org/apache/paimon}/lookup/RocksDBState.java | 6 +- .../apache/paimon}/lookup/RocksDBStateFactory.java | 65 +++++++++++----------- .../apache/paimon}/lookup/RocksDBValueState.java | 9 ++- .../crosspartition}/GlobalIndexAssignerTest.java | 49 ++++++++-------- .../paimon/crosspartition}/IndexBootstrapTest.java | 6 +- .../configuration/ConfigOptionsDocGenerator.java | 1 + paimon-flink/paimon-flink-common/pom.xml | 7 --- .../flink/lookup/FileStoreLookupFunction.java | 5 +- .../apache/paimon/flink/lookup/LookupTable.java | 1 + .../flink/lookup/NoPrimaryKeyLookupTable.java | 2 + .../paimon/flink/lookup/PrimaryKeyLookupTable.java | 2 + .../flink/lookup/SecondaryIndexLookupTable.java | 2 + .../flink/sink/index/GlobalDynamicBucketSink.java | 3 +- .../sink/index/GlobalIndexAssignerOperator.java | 7 +-- .../flink/sink/index/IndexBootstrapOperator.java | 1 + .../flink/lookup/FileStoreLookupFunctionTest.java | 2 +- .../paimon/flink/lookup/LookupTableTest.java | 1 + .../paimon/flink/lookup/RocksDBListStateTest.java | 4 +- 28 files changed, 149 insertions(+), 110 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java b/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java new file mode 100644 index 000000000..5bb7d65fe --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/KeyValueIterator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.IOException; + +/** An iterator for Key and Value. */ +public interface KeyValueIterator<K, V> { + + boolean advanceNext() throws IOException; + + K getKey(); + + V getValue(); +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java similarity index 98% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java rename to paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java index 56595cf14..25d089b83 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/ListDelimitedSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListDelimitedSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.utils; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.io.DataInputDeserializer; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java similarity index 91% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java rename to paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java index f18116679..30e47b985 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ProjectToRowDataFunction.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectToRowFunction.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.paimon.flink.utils; +package org.apache.paimon.utils; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SerBiFunction; import java.util.List; import java.util.Map; @@ -33,15 +32,14 @@ import java.util.stream.IntStream; import static org.apache.paimon.data.InternalRow.createFieldGetter; /** Project {@link BinaryRow} fields into {@link InternalRow}. */ -public class ProjectToRowDataFunction - implements SerBiFunction<InternalRow, BinaryRow, InternalRow> { +public class ProjectToRowFunction implements SerBiFunction<InternalRow, BinaryRow, InternalRow> { private final InternalRow.FieldGetter[] fieldGetters; private final Map<Integer, Integer> projectMapping; private final InternalRow.FieldGetter[] projectGetters; - public ProjectToRowDataFunction(RowType rowType, List<String> projectFields) { + public ProjectToRowFunction(RowType rowType, List<String> projectFields) { DataType[] types = rowType.getFieldTypes().toArray(new DataType[0]); this.fieldGetters = IntStream.range(0, types.length) diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 3190bf5a4..935ba024d 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -31,6 +31,10 @@ under the License. <artifactId>paimon-core</artifactId> <name>Paimon : Core</name> + <properties> + <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.paimon</groupId> @@ -58,6 +62,13 @@ under the License. <version>${lz4.version}</version> </dependency> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>frocksdbjni</artifactId> + <version>${frocksdbjni.version}</version> + <scope>provided</scope> + </dependency> + <!-- test dependencies --> <dependency> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java rename to paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index 9f168d79e..2dfbc7075 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.index; +package org.apache.paimon.crosspartition; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.MergeEngine; @@ -28,10 +28,9 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; -import org.apache.paimon.flink.RocksDBOptions; -import org.apache.paimon.flink.lookup.RocksDBStateFactory; -import org.apache.paimon.flink.lookup.RocksDBValueState; -import org.apache.paimon.flink.utils.ProjectToRowDataFunction; +import org.apache.paimon.lookup.RocksDBOptions; +import org.apache.paimon.lookup.RocksDBStateFactory; +import org.apache.paimon.lookup.RocksDBValueState; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.options.Options; import org.apache.paimon.sort.BinaryExternalSortBuffer; @@ -46,14 +45,14 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.IDMapping; +import org.apache.paimon.utils.KeyValueIterator; import org.apache.paimon.utils.MutableObjectIterator; import org.apache.paimon.utils.OffsetRow; import org.apache.paimon.utils.PositiveIntInt; import org.apache.paimon.utils.PositiveIntIntSerializer; +import org.apache.paimon.utils.ProjectToRowFunction; import org.apache.paimon.utils.TypeUtils; -import org.apache.flink.table.runtime.util.KeyValueIterator; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -82,7 +81,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { private transient IOManager ioManager; private transient int bucketIndex; - private transient ProjectToRowDataFunction setPartition; + private transient ProjectToRowFunction setPartition; private transient boolean bootstrap; private transient BinaryExternalSortBuffer bootstrapKeys; private transient RowBuffer bootstrapRecords; @@ -120,7 +119,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema()); this.bucketIndex = bootstrapType.getFieldCount() - 1; - this.setPartition = new ProjectToRowDataFunction(table.rowType(), table.partitionKeys()); + this.setPartition = new ProjectToRowFunction(table.rowType(), table.partitionKeys()); CoreOptions coreOptions = table.coreOptions(); this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum(); @@ -219,7 +218,7 @@ public class GlobalIndexAssigner implements Serializable, Closeable { } }; - stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter); + stateFactory.bulkLoad(keyIndex, kvIter); isEmpty = false; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java rename to paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java index 392b29622..e99f47f36 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.index; +package org.apache.paimon.crosspartition; import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java rename to paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java index 188f3fa61..ea86358ac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.index; +package org.apache.paimon.crosspartition; import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.Projection; @@ -26,13 +26,11 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.types.RowType; -import org.apache.flink.table.data.RowData; - import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -/** A {@link PartitionKeyExtractor} to {@link RowData} with only key and partiton fields. */ +/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and partiton fields. */ public class KeyPartPartitionKeyExtractor implements PartitionKeyExtractor<InternalRow> { private final Projection partitionProjection; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java index 72ea7e9a0..8a47486af 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBListState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.lookup; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.utils.ListDelimitedSerializer; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java index d54a5ef8a..74b92bbc3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/RocksDBOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink; +package org.apache.paimon.lookup; import org.apache.paimon.annotation.Documentation; import org.apache.paimon.options.ConfigOption; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java index a27ff44a4..5d247e426 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBSetState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.lookup; import org.apache.paimon.data.serializer.Serializer; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java similarity index 97% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java index 3632069d7..729b38173 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.lookup; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.io.DataInputDeserializer; @@ -77,10 +77,6 @@ public abstract class RocksDBState<K, V, CacheV> { .build(); } - public ColumnFamilyHandle columnFamily() { - return columnFamily; - } - public byte[] serializeKey(K key) throws IOException { keyOutView.clear(); keySerializer.serialize(key, keyOutView); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java similarity index 76% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java index 757d08776..1bc1edbcd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java @@ -16,12 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.lookup; import org.apache.paimon.data.serializer.Serializer; -import org.apache.paimon.flink.RocksDBOptions; +import org.apache.paimon.utils.KeyValueIterator; -import org.apache.flink.table.runtime.util.KeyValueIterator; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -82,39 +81,43 @@ public class RocksDBStateFactory implements Closeable { } } - public void bulkLoad(ColumnFamilyHandle columnFamily, KeyValueIterator<byte[], byte[]> iterator) - throws RocksDBException, IOException { - long targetFileSize = options.targetFileSizeBase(); - - List<String> files = new ArrayList<>(); - SstFileWriter writer = null; - long recordNum = 0; - while (iterator.advanceNext()) { - byte[] key = iterator.getKey(); - byte[] value = iterator.getValue(); - - if (writer == null) { - writer = new SstFileWriter(new EnvOptions(), options); - String path = new File(this.path, "sst-" + (sstIndex++)).getPath(); - writer.open(path); - files.add(path); + public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], byte[]> iterator) + throws IOException { + try { + long targetFileSize = options.targetFileSizeBase(); + + List<String> files = new ArrayList<>(); + SstFileWriter writer = null; + long recordNum = 0; + while (iterator.advanceNext()) { + byte[] key = iterator.getKey(); + byte[] value = iterator.getValue(); + + if (writer == null) { + writer = new SstFileWriter(new EnvOptions(), options); + String path = new File(this.path, "sst-" + (sstIndex++)).getPath(); + writer.open(path); + files.add(path); + } + + writer.put(key, value); + recordNum++; + if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) { + writer.finish(); + writer = null; + recordNum = 0; + } } - writer.put(key, value); - recordNum++; - if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) { + if (writer != null) { writer.finish(); - writer = null; - recordNum = 0; } - } - if (writer != null) { - writer.finish(); - } - - if (files.size() > 0) { - db.ingestExternalFile(columnFamily, files, new IngestExternalFileOptions()); + if (files.size() > 0) { + db.ingestExternalFile(state.columnFamily, files, new IngestExternalFileOptions()); + } + } catch (Exception e) { + throw new IOException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java similarity index 93% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java index a12855d88..8fadb509e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java @@ -16,13 +16,12 @@ * limitations under the License. */ -package org.apache.paimon.flink.lookup; +package org.apache.paimon.lookup; import org.apache.paimon.data.serializer.Serializer; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; import javax.annotation.Nullable; @@ -52,7 +51,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref } } - private Reference get(ByteArray keyBytes) throws RocksDBException { + private Reference get(ByteArray keyBytes) throws Exception { Reference valueRef = cache.getIfPresent(keyBytes); if (valueRef == null) { valueRef = ref(db.get(columnFamily, keyBytes.bytes)); @@ -70,7 +69,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref byte[] valueBytes = serializeValue(value); db.put(columnFamily, writeOptions, keyBytes, valueBytes); cache.put(wrap(keyBytes), ref(valueBytes)); - } catch (RocksDBException e) { + } catch (Exception e) { throw new IOException(e); } } @@ -83,7 +82,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref db.delete(columnFamily, writeOptions, keyBytes); cache.put(keyByteArray, ref(null)); } - } catch (RocksDBException e) { + } catch (Exception e) { throw new IOException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java similarity index 84% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java rename to paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java index b2e8d0871..7a4ae602b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/GlobalIndexAssignerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.index; +package org.apache.paimon.crosspartition; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.MergeEngine; @@ -29,8 +29,9 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.Pair; -import org.apache.flink.api.java.tuple.Tuple2; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.io.File; @@ -40,7 +41,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link GlobalIndexAssigner}. */ @@ -73,7 +73,7 @@ public class GlobalIndexAssignerTest extends TableTestBase { .options(options.toMap()) .build(); catalog.createTable(identifier, schema, true); - return GlobalIndexAssignerOperator.createRowDataAssigner(catalog.getTable(identifier)); + return new GlobalIndexAssigner(catalog.getTable(identifier)); } @Test @@ -126,38 +126,38 @@ public class GlobalIndexAssignerTest extends TableTestBase { @Test public void testUpsert() throws Exception { GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE); - List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + List<Pair<InternalRow, Integer>> output = new ArrayList<>(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(Pair.of(row, bucket))); assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); assigner.processInput(GenericRow.of(2, 1, 2)); - assertThat(output) + Assertions.assertThat(output) .containsExactly( - new Tuple2<>(GenericRow.of(1, 1, 1), 0), - new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2), 0), - new Tuple2<>(GenericRow.of(2, 1, 2), 0)); + Pair.of(GenericRow.of(1, 1, 1), 0), + Pair.of(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2), 0), + Pair.of(GenericRow.of(2, 1, 2), 0)); output.clear(); // test partition 1 deleted assigner.processInput(GenericRow.of(1, 2, 2)); assigner.processInput(GenericRow.of(1, 3, 3)); assigner.processInput(GenericRow.of(1, 4, 4)); - assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0); + assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 0); output.clear(); // move from full bucket assigner.processInput(GenericRow.of(2, 4, 4)); - assertThat(output) + Assertions.assertThat(output) .containsExactly( - new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4, 4), 0), - new Tuple2<>(GenericRow.of(2, 4, 4), 0)); + Pair.of(GenericRow.ofKind(RowKind.DELETE, 1, 4, 4), 0), + Pair.of(GenericRow.of(2, 4, 4), 0)); output.clear(); // test partition 1 deleted assigner.processInput(GenericRow.of(1, 5, 5)); - assertThat(output.stream().map(t -> t.f1)).containsExactly(0); + assertThat(output.stream().map(Pair::getRight)).containsExactly(0); output.clear(); assigner.close(); @@ -170,24 +170,23 @@ public class GlobalIndexAssignerTest extends TableTestBase { ? MergeEngine.PARTIAL_UPDATE : MergeEngine.AGGREGATE; GlobalIndexAssigner assigner = createAssigner(mergeEngine); - List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + List<Pair<InternalRow, Integer>> output = new ArrayList<>(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(Pair.of(row, bucket))); assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); assigner.processInput(GenericRow.of(2, 1, 2)); - assertThat(output) + Assertions.assertThat(output) .containsExactly( - new Tuple2<>(GenericRow.of(1, 1, 1), 0), - new Tuple2<>(GenericRow.of(1, 1, 2), 0)); + Pair.of(GenericRow.of(1, 1, 1), 0), Pair.of(GenericRow.of(1, 1, 2), 0)); output.clear(); // test partition 2 no effect assigner.processInput(GenericRow.of(2, 2, 2)); assigner.processInput(GenericRow.of(2, 3, 3)); assigner.processInput(GenericRow.of(2, 4, 4)); - assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0); + assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 0); output.clear(); assigner.close(); } @@ -195,21 +194,21 @@ public class GlobalIndexAssignerTest extends TableTestBase { @Test public void testFirstRow() throws Exception { GlobalIndexAssigner assigner = createAssigner(MergeEngine.FIRST_ROW); - List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + List<Pair<InternalRow, Integer>> output = new ArrayList<>(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(Pair.of(row, bucket))); assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); assigner.processInput(GenericRow.of(2, 1, 2)); - assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1, 1), 0)); + Assertions.assertThat(output).containsExactly(Pair.of(GenericRow.of(1, 1, 1), 0)); output.clear(); // test partition 2 no effect assigner.processInput(GenericRow.of(2, 2, 2)); assigner.processInput(GenericRow.of(2, 3, 3)); assigner.processInput(GenericRow.of(2, 4, 4)); - assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0); + assertThat(output.stream().map(Pair::getRight)).containsExactly(0, 0, 0); output.clear(); assigner.close(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java rename to paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index ca73dfebf..976466464 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.index; +package org.apache.paimon.crosspartition; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; @@ -42,11 +42,11 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import static org.apache.paimon.crosspartition.IndexBootstrap.filterSplit; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; -import static org.apache.paimon.flink.sink.index.IndexBootstrap.filterSplit; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link IndexBootstrap}. */ +/** Test for {@link org.apache.paimon.crosspartition.IndexBootstrap}. */ public class IndexBootstrapTest extends TableTestBase { @Test diff --git a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java index a8b9cc0bf..e069c967f 100644 --- a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java +++ b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java @@ -74,6 +74,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation[] { new OptionsClassLocation("paimon-common", "org.apache.paimon.options"), new OptionsClassLocation("paimon-common", "org.apache.paimon"), + new OptionsClassLocation("paimon-core", "org.apache.paimon.lookup"), new OptionsClassLocation( "paimon-flink/paimon-flink-common", "org.apache.paimon.flink"), new OptionsClassLocation( diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index aa8bda419..b1dcb26ba 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -35,7 +35,6 @@ under the License. <properties> <flink.version>1.18.0</flink.version> - <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version> </properties> <dependencies> @@ -67,12 +66,6 @@ under the License. <scope>provided</scope> </dependency> - <dependency> - <groupId>com.ververica</groupId> - <artifactId>frocksdbjni</artifactId> - <version>${frocksdbjni.version}</version> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 36f393210..c26e373b2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.utils.TableScanUtils; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateFilter; @@ -59,8 +60,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; -import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CACHE_ROWS; -import static org.apache.paimon.flink.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; +import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A lookup {@link TableFunction} for file store. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index 29f319ee1..7aef1838a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.types.RowType; import java.io.IOException; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index caa809062..0af3e9e90 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -22,6 +22,8 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.lookup.RocksDBListState; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.KeyProjectedRow; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index 0928bf047..b4c8bbe7f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -20,6 +20,8 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.lookup.RocksDBStateFactory; +import org.apache.paimon.lookup.RocksDBValueState; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.KeyProjectedRow; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 61452d681..4364036e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -20,6 +20,8 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.lookup.RocksDBSetState; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.KeyProjectedRow; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java index eb2f8d30f..589f86a86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink.index; +import org.apache.paimon.crosspartition.IndexBootstrap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator; @@ -43,8 +44,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; -import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType; /** Sink for global dynamic bucket table. */ public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow, Integer>> { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java index 8133177a9..4cb56435c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink.index; +import org.apache.paimon.crosspartition.GlobalIndexAssigner; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.table.Table; @@ -103,10 +104,6 @@ public class GlobalIndexAssignerOperator } public static GlobalIndexAssignerOperator forRowData(Table table) { - return new GlobalIndexAssignerOperator(createRowDataAssigner(table)); - } - - public static GlobalIndexAssigner createRowDataAssigner(Table t) { - return new GlobalIndexAssigner(t); + return new GlobalIndexAssignerOperator(new GlobalIndexAssigner(table)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java index d5f0683a7..a75274be9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrapOperator.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink.index; +import org.apache.paimon.crosspartition.IndexBootstrap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.SerializableFunction; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index 8ff6c0089..5eacc7d7b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkRowData; -import org.apache.paimon.flink.RocksDBOptions; +import org.apache.paimon.lookup.RocksDBOptions; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index dc1932b78..181eefe40 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -20,6 +20,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowKind; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java index 09c2ea106..e0a2b516d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java @@ -26,6 +26,8 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.lookup.RocksDBListState; +import org.apache.paimon.lookup.RocksDBStateFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -40,7 +42,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link org.apache.paimon.flink.lookup.RocksDBListState}. */ +/** Test for {@link RocksDBListState}. */ public class RocksDBListStateTest { @TempDir Path tempDir;