This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6191c3349 [core] Introduce IndexMaintainer to FileStoreWrite and
HashBucketAssigner (#1354)
6191c3349 is described below
commit 6191c3349514158434d0705e44b5c4eba1490c78
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 12 17:42:46 2023 +0800
[core] Introduce IndexMaintainer to FileStoreWrite and HashBucketAssigner
(#1354)
---
.../java/org/apache/paimon/KeyValueFileStore.java | 7 +
.../apache/paimon/index/HashBucketAssigner.java | 227 +++++++++++++++++++++
.../apache/paimon/index/HashIndexMaintainer.java | 109 ++++++++++
.../org/apache/paimon/index/IndexMaintainer.java | 36 ++++
.../paimon/operation/AbstractFileStoreWrite.java | 100 +++++----
.../paimon/operation/AppendOnlyFileStoreWrite.java | 2 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 4 +-
.../paimon/operation/MemoryFileStoreWrite.java | 8 +-
.../paimon/table/AbstractFileStoreTable.java | 5 +-
.../apache/paimon/table/sink/DynamicBucketRow.java | 133 ++++++++++++
.../table/sink/DynamicBucketRowKeyExtractor.java | 50 +++++
.../apache/paimon/table/sink/TableWriteImpl.java | 6 +-
.../paimon/catalog/PrimaryKeyTableTestBase.java | 63 ++++++
.../paimon/index/HashBucketAssignerTest.java | 167 +++++++++++++++
.../paimon/index/HashIndexMaintainerTest.java | 126 ++++++++++++
.../apache/paimon/table/sink/TableWriteTest.java | 5 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 4 +-
.../flink/source/TestChangelogDataReadWrite.java | 1 +
18 files changed, 996 insertions(+), 57 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 9ee654e23..ad0ad67dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -22,6 +22,8 @@ import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.KeyValueFileStoreRead;
@@ -110,6 +112,10 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreWrite newWrite(String commitUser,
ManifestCacheFilter manifestFilter) {
+ IndexMaintainer.Factory<KeyValue> indexFactory = null;
+ if (bucketMode() == BucketMode.DYNAMIC) {
+ indexFactory = new
HashIndexMaintainer.Factory(newIndexFileHandler());
+ }
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
@@ -123,6 +129,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
+ indexFactory,
options,
keyValueFieldsExtractor);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
new file mode 100644
index 000000000..aa3336b04
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.utils.Int2ShortHashMap;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Assign bucket for key hashcode. */
+public class HashBucketAssigner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HashBucketAssigner.class);
+
+ private final SnapshotManager snapshotManager;
+ private final String commitUser;
+ private final IndexFileHandler indexFileHandler;
+ private final int numAssigners;
+ private final int assignId;
+ private final long targetBucketRowNumber;
+
+ private final Map<BinaryRow, PartitionIndex> partitionIndex;
+
+ public HashBucketAssigner(
+ SnapshotManager snapshotManager,
+ String commitUser,
+ IndexFileHandler indexFileHandler,
+ int numAssigners,
+ int assignId,
+ long targetBucketRowNumber) {
+ this.snapshotManager = snapshotManager;
+ this.commitUser = commitUser;
+ this.indexFileHandler = indexFileHandler;
+ this.numAssigners = numAssigners;
+ this.assignId = assignId;
+ this.targetBucketRowNumber = targetBucketRowNumber;
+ this.partitionIndex = new HashMap<>();
+ }
+
+ /** Assign a bucket for key hash of a record. */
+ public int assign(BinaryRow partition, int hash) {
+ int recordAssignId = computeAssignId(hash);
+ checkArgument(
+ recordAssignId == assignId,
+ "This is a bug, record assign id %s should equal to assign id
%s.",
+ recordAssignId,
+ assignId);
+
+ // 1. is it a key that has appeared before
+ PartitionIndex index = partitionIndex.computeIfAbsent(partition,
this::loadIndex);
+ index.accessed = true;
+ Int2ShortHashMap hash2Bucket = index.hash2Bucket;
+ if (hash2Bucket.containsKey(hash)) {
+ return hash2Bucket.get(hash);
+ }
+
+ // 2. find bucket from existing buckets
+ Map<Integer, Long> buckets = index.bucketInformation;
+ for (Integer bucket : buckets.keySet()) {
+ if (computeAssignId(bucket) == assignId) {
+ // it is my bucket
+ Long number = buckets.get(bucket);
+ if (number < targetBucketRowNumber) {
+ buckets.put(bucket, number + 1);
+ hash2Bucket.put(hash, bucket.shortValue());
+ return bucket;
+ }
+ }
+ }
+
+ // 3. create a new bucket
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ if (computeAssignId(i) == assignId && !buckets.containsKey(i)) {
+ hash2Bucket.put(hash, (short) i);
+ buckets.put(i, 1L);
+ return i;
+ }
+ }
+
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ int maxBucket =
buckets.keySet().stream().mapToInt(Integer::intValue).max().getAsInt();
+ throw new RuntimeException(
+ String.format(
+ "To more bucket %s, you should increase target bucket
row number %s.",
+ maxBucket, targetBucketRowNumber));
+ }
+
+ /** Prepare commit to clear outdated partition index. */
+ public void prepareCommit(long commitIdentifier) {
+ long latestCommittedIdentifier;
+ if (partitionIndex.values().stream()
+ .mapToLong(i -> i.lastAccessedCommitIdentifier)
+ .max()
+ .orElse(Long.MIN_VALUE)
+ == Long.MIN_VALUE) {
+ // Optimization for the first commit.
+ //
+ // If this is the first commit, no index has previous modified
commit, so the value of
+ // `latestCommittedIdentifier` does not matter.
+ //
+ // Without this optimization, we may need to scan through all
snapshots only to find
+ // that there is no previous snapshot by this user, which is very
inefficient.
+ latestCommittedIdentifier = Long.MIN_VALUE;
+ } else {
+ latestCommittedIdentifier =
+ snapshotManager
+ .latestSnapshotOfUser(commitUser)
+ .map(Snapshot::commitIdentifier)
+ .orElse(Long.MIN_VALUE);
+ }
+
+ Iterator<Map.Entry<BinaryRow, PartitionIndex>> iterator =
+ partitionIndex.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<BinaryRow, PartitionIndex> entry = iterator.next();
+ BinaryRow partition = entry.getKey();
+ PartitionIndex index = entry.getValue();
+ if (index.accessed) {
+ index.lastAccessedCommitIdentifier = commitIdentifier;
+ } else {
+ if (index.lastAccessedCommitIdentifier <=
latestCommittedIdentifier) {
+ // Clear writer if no update, and if its latest
modification has committed.
+ //
+ // We need a mechanism to clear index, otherwise there
will be more and
+ // more such as yesterday's partition that no longer needs
to be accessed.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Removing index for partition {}. "
+ + "Index's last accessed identifier is
{}, "
+ + "while latest committed identifier
is {}",
+ partition,
+ index.lastAccessedCommitIdentifier,
+ latestCommittedIdentifier);
+ }
+ iterator.remove();
+ }
+ }
+ index.accessed = false;
+ }
+ }
+
+ @VisibleForTesting
+ Set<BinaryRow> currentPartitions() {
+ return partitionIndex.keySet();
+ }
+
+ private int computeAssignId(int hash) {
+ return Math.abs(hash % numAssigners);
+ }
+
+ private PartitionIndex loadIndex(BinaryRow partition) {
+ Int2ShortHashMap map = new Int2ShortHashMap();
+ List<IndexManifestEntry> files = indexFileHandler.scan(HASH_INDEX,
partition);
+ Map<Integer, Long> buckets = new HashMap<>();
+ for (IndexManifestEntry file : files) {
+ try (IntIterator iterator =
indexFileHandler.readHashIndex(file.indexFile())) {
+ while (true) {
+ try {
+ int hash = iterator.next();
+ if (computeAssignId(hash) == assignId) {
+ map.put(hash, (short) file.bucket());
+ }
+ buckets.compute(
+ file.bucket(), (bucket, number) -> number ==
null ? 1 : number + 1);
+ } catch (EOFException ignored) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return new PartitionIndex(map, buckets);
+ }
+
+ private static class PartitionIndex {
+
+ private final Int2ShortHashMap hash2Bucket;
+
+ private final Map<Integer, Long> bucketInformation;
+
+ private boolean accessed;
+
+ private long lastAccessedCommitIdentifier;
+
+ private PartitionIndex(Int2ShortHashMap hash2Bucket, Map<Integer,
Long> bucketInformation) {
+ this.hash2Bucket = hash2Bucket;
+ this.bucketInformation = bucketInformation;
+ this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
+ this.accessed = true;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
new file mode 100644
index 000000000..644be637f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.IntHashSet;
+import org.apache.paimon.utils.IntIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** An {@link IndexMaintainer} for dynamic bucket to maintain key hashcode in
a bucket. */
+public class HashIndexMaintainer implements IndexMaintainer<KeyValue> {
+
+ private final IndexFileHandler fileHandler;
+ private final IntHashSet hashcode;
+
+ private boolean modified;
+
+ private HashIndexMaintainer(
+ IndexFileHandler fileHandler, Long snapshotId, BinaryRow
partition, int bucket) {
+ this.fileHandler = fileHandler;
+ IntHashSet hashcode = new IntHashSet();
+ if (snapshotId != null) {
+ Optional<IndexFileMeta> indexFile =
+ fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX,
partition, bucket);
+ if (indexFile.isPresent()) {
+ IndexFileMeta file = indexFile.get();
+ hashcode = new IntHashSet((int) file.rowCount());
+ restore(fileHandler, hashcode, file);
+ }
+ }
+ this.hashcode = hashcode;
+ this.modified = false;
+ }
+
+ private void restore(IndexFileHandler fileHandler, IntHashSet hashcode,
IndexFileMeta file) {
+ try (IntIterator iterator = fileHandler.readHashIndex(file)) {
+ while (true) {
+ try {
+ hashcode.add(iterator.next());
+ } catch (EOFException ignored) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void notifyNewRecord(KeyValue record) {
+ InternalRow key = record.key();
+ if (!(key instanceof BinaryRow)) {
+ throw new IllegalArgumentException("Unsupported key type: " +
key.getClass());
+ }
+ hashcode.add(key.hashCode());
+ modified = true;
+ }
+
+ @Override
+ public List<IndexFileMeta> prepareCommit() {
+ if (modified) {
+ IndexFileMeta entry =
+ fileHandler.writeHashIndex(hashcode.size(),
hashcode.toIntIterator());
+ modified = false;
+ return Collections.singletonList(entry);
+ }
+ return Collections.emptyList();
+ }
+
+ /** Factory to restore {@link HashIndexMaintainer}. */
+ public static class Factory implements IndexMaintainer.Factory<KeyValue> {
+
+ private final IndexFileHandler handler;
+
+ public Factory(IndexFileHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public IndexMaintainer<KeyValue> createOrRestore(
+ Long snapshotId, BinaryRow partition, int bucket) {
+ return new HashIndexMaintainer(handler, snapshotId, partition,
bucket);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
new file mode 100644
index 000000000..8d61d46f8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.List;
+
+/** Maintainer to maintain index. */
+public interface IndexMaintainer<T> {
+
+ void notifyNewRecord(T record);
+
+ List<IndexFileMeta> prepareCommit();
+
+ /** Factory to restore {@link IndexMaintainer}. */
+ interface Factory<T> {
+ IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow
partition, int bucket);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 06062800f..093b96fe7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -22,7 +22,10 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.sink.CommitMessage;
@@ -40,7 +43,6 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -54,13 +56,14 @@ import java.util.concurrent.Executors;
* @param <T> type of record to write.
*/
public abstract class AbstractFileStoreWrite<T>
- implements FileStoreWrite<T>,
Restorable<List<AbstractFileStoreWrite.State>> {
+ implements FileStoreWrite<T>,
Restorable<List<AbstractFileStoreWrite.State<T>>> {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractFileStoreWrite.class);
private final String commitUser;
- private final SnapshotManager snapshotManager;
+ protected final SnapshotManager snapshotManager;
private final FileStoreScan scan;
+ @Nullable private final IndexMaintainer.Factory<T> indexFactory;
@Nullable protected IOManager ioManager;
@@ -70,10 +73,14 @@ public abstract class AbstractFileStoreWrite<T>
private boolean emptyWriter = false;
protected AbstractFileStoreWrite(
- String commitUser, SnapshotManager snapshotManager, FileStoreScan
scan) {
+ String commitUser,
+ SnapshotManager snapshotManager,
+ FileStoreScan scan,
+ @Nullable IndexMaintainer.Factory<T> indexFactory) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
+ this.indexFactory = indexFactory;
this.writers = new HashMap<>();
}
@@ -96,8 +103,11 @@ public abstract class AbstractFileStoreWrite<T>
@Override
public void write(BinaryRow partition, int bucket, T data) throws
Exception {
- RecordWriter<T> writer = getWriterWrapper(partition, bucket).writer;
- writer.write(data);
+ WriterContainer<T> container = getWriterWrapper(partition, bucket);
+ container.writer.write(data);
+ if (container.indexMaintainer != null) {
+ container.indexMaintainer.notifyNewRecord(data);
+ }
}
@Override
@@ -165,12 +175,17 @@ public abstract class AbstractFileStoreWrite<T>
WriterContainer<T> writerContainer = entry.getValue();
CommitIncrement increment =
writerContainer.writer.prepareCommit(waitCompaction);
+ List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+ if (writerContainer.indexMaintainer != null) {
+ newIndexFiles =
writerContainer.indexMaintainer.prepareCommit();
+ }
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
increment.newFilesIncrement(),
- increment.compactIncrement());
+ increment.compactIncrement(),
+ new IndexIncrement(newIndexFiles));
result.add(committable);
if (committable.isEmpty()) {
@@ -219,8 +234,8 @@ public abstract class AbstractFileStoreWrite<T>
}
@Override
- public List<State> checkpoint() {
- List<State> result = new ArrayList<>();
+ public List<State<T>> checkpoint() {
+ List<State<T>> result = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>
partitionEntry :
writers.entrySet()) {
@@ -245,25 +260,26 @@ public abstract class AbstractFileStoreWrite<T>
// compaction result might be updated during prepareCommit
Collection<DataFileMeta> dataFiles =
writerContainer.writer.dataFiles();
result.add(
- new State(
+ new State<>(
partition,
bucket,
writerContainer.baseSnapshotId,
writerContainer.lastModifiedCommitIdentifier,
dataFiles,
+ writerContainer.indexMaintainer,
increment));
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Extracted state " + result.toString());
+ LOG.debug("Extracted state " + result);
}
return result;
}
@Override
- public void restore(List<State> states) {
- for (State state : states) {
+ public void restore(List<State<T>> states) {
+ for (State<T> state : states) {
RecordWriter<T> writer =
createWriter(
state.partition,
@@ -273,7 +289,7 @@ public abstract class AbstractFileStoreWrite<T>
compactExecutor());
notifyNewWriter(writer);
WriterContainer<T> writerContainer =
- new WriterContainer<>(writer, state.baseSnapshotId);
+ new WriterContainer<>(writer, state.indexMaintainer,
state.baseSnapshotId);
writerContainer.lastModifiedCommitIdentifier =
state.lastModifiedCommitIdentifier;
writers.computeIfAbsent(state.partition, k -> new HashMap<>())
.put(state.bucket, writerContainer);
@@ -298,38 +314,27 @@ public abstract class AbstractFileStoreWrite<T>
}
Long latestSnapshotId = snapshotManager.latestSnapshotId();
- RecordWriter<T> writer;
- if (emptyWriter) {
- writer =
- createWriter(
- partition.copy(),
- bucket,
- Collections.emptyList(),
- null,
- compactExecutor());
- } else {
- writer =
- createWriter(
- partition.copy(),
- bucket,
- scanExistingFileMetas(latestSnapshotId, partition,
bucket),
- null,
- compactExecutor());
+ List<DataFileMeta> restoreFiles = new ArrayList<>();
+ if (!emptyWriter && latestSnapshotId != null) {
+ restoreFiles = scanExistingFileMetas(latestSnapshotId, partition,
bucket);
}
+ IndexMaintainer<T> indexMaintainer =
+ indexFactory == null
+ ? null
+ : indexFactory.createOrRestore(latestSnapshotId,
partition, bucket);
+ RecordWriter<T> writer =
+ createWriter(partition.copy(), bucket, restoreFiles, null,
compactExecutor());
notifyNewWriter(writer);
- return new WriterContainer<>(writer, latestSnapshotId);
+ return new WriterContainer<>(writer, indexMaintainer,
latestSnapshotId);
}
private List<DataFileMeta> scanExistingFileMetas(
- Long snapshotId, BinaryRow partition, int bucket) {
+ long snapshotId, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
- if (snapshotId != null) {
- // Concat all the DataFileMeta of existing files into
existingFileMetas.
- scan.withSnapshot(snapshotId).withPartitionBucket(partition,
bucket).plan().files()
- .stream()
- .map(ManifestEntry::file)
- .forEach(existingFileMetas::add);
- }
+ // Concat all the DataFileMeta of existing files into
existingFileMetas.
+ scan.withSnapshot(snapshotId).withPartitionBucket(partition,
bucket).plan().files().stream()
+ .map(ManifestEntry::file)
+ .forEach(existingFileMetas::add);
return existingFileMetas;
}
@@ -359,11 +364,16 @@ public abstract class AbstractFileStoreWrite<T>
@VisibleForTesting
public static class WriterContainer<T> {
public final RecordWriter<T> writer;
+ @Nullable public final IndexMaintainer<T> indexMaintainer;
protected final long baseSnapshotId;
protected long lastModifiedCommitIdentifier;
- protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId)
{
+ protected WriterContainer(
+ RecordWriter<T> writer,
+ @Nullable IndexMaintainer<T> indexMaintainer,
+ Long baseSnapshotId) {
this.writer = writer;
+ this.indexMaintainer = indexMaintainer;
this.baseSnapshotId =
baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 :
baseSnapshotId;
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
@@ -371,13 +381,14 @@ public abstract class AbstractFileStoreWrite<T>
}
/** Recoverable state of {@link AbstractFileStoreWrite}. */
- public static class State {
+ public static class State<T> {
protected final BinaryRow partition;
protected final int bucket;
protected final long baseSnapshotId;
protected final long lastModifiedCommitIdentifier;
protected final List<DataFileMeta> dataFiles;
+ @Nullable protected final IndexMaintainer<T> indexMaintainer;
protected final CommitIncrement commitIncrement;
protected State(
@@ -386,24 +397,27 @@ public abstract class AbstractFileStoreWrite<T>
long baseSnapshotId,
long lastModifiedCommitIdentifier,
Collection<DataFileMeta> dataFiles,
+ @Nullable IndexMaintainer<T> indexMaintainer,
CommitIncrement commitIncrement) {
this.partition = partition;
this.bucket = bucket;
this.baseSnapshotId = baseSnapshotId;
this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
this.dataFiles = new ArrayList<>(dataFiles);
+ this.indexMaintainer = indexMaintainer;
this.commitIncrement = commitIncrement;
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %d, %s, %s}",
+ "{%s, %d, %d, %d, %s, %s, %s}",
partition,
bucket,
baseSnapshotId,
lastModifiedCommitIdentifier,
dataFiles,
+ indexMaintainer,
commitIncrement);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 4cae8adce..fdfcb21c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -76,7 +76,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options) {
- super(commitUser, snapshotManager, scan);
+ super(commitUser, snapshotManager, scan, null);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index be6ca924b..86abcc947 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -92,9 +93,10 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
+ @Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor) {
- super(commitUser, snapshotManager, scan, options);
+ super(commitUser, snapshotManager, scan, options, indexFactory);
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 40b8da562..6268eedd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
@@ -32,6 +33,8 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Iterator;
import java.util.Map;
@@ -54,8 +57,9 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
- CoreOptions options) {
- super(commitUser, snapshotManager, scan);
+ CoreOptions options,
+ @Nullable IndexMaintainer.Factory<T> indexFactory) {
+ super(commitUser, snapshotManager, scan, indexFactory);
this.options = options;
this.cacheManager =
new CacheManager(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index db85d12b2..29da09274 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
@@ -89,9 +90,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
case FIXED:
return new FixedBucketRowKeyExtractor(schema());
case DYNAMIC:
- // todo: dynamic bucket table need to check current bucket,
when full, return the
- // next one.
- throw new UnsupportedOperationException("Not supported yet.");
+ return new DynamicBucketRowKeyExtractor(schema());
case UNAWARE:
return new UnawareBucketRowKeyExtractor(schema());
default:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
new file mode 100644
index 000000000..73394a9f5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An {@link InternalRow} wraps another {@link InternalRow} with bucket. */
+public class DynamicBucketRow implements InternalRow {
+
+ private final InternalRow row;
+ private final int bucket;
+
+ public DynamicBucketRow(InternalRow row, int bucket) {
+ this.row = row;
+ this.bucket = bucket;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return row.getFieldCount();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return row.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ row.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return row.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return row.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return row.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return row.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return row.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return row.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return row.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return row.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return row.getString(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return row.getDecimal(pos, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ return row.getTimestamp(pos, precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return row.getBinary(pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return row.getArray(pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return row.getMap(pos);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return row.getRow(pos, numFields);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
new file mode 100644
index 000000000..6ae14b5f7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * {@link KeyAndBucketExtractor} for {@link InternalRow} with extracting
bucket from {@link
+ * DynamicBucketRow}.
+ */
+public class DynamicBucketRowKeyExtractor extends RowKeyExtractor {
+
+ public DynamicBucketRowKeyExtractor(TableSchema schema) {
+ super(schema);
+ int numBuckets = new CoreOptions(schema.options()).bucket();
+ checkArgument(
+ numBuckets == -1,
+ "Only 'bucket' = '-1' is allowed for
'DynamicBucketRowKeyExtractor', but found: "
+ + numBuckets);
+ }
+
+ @Override
+ public int bucket() {
+ if (record instanceof DynamicBucketRow) {
+ return ((DynamicBucketRow) record).bucket();
+ }
+ throw new IllegalArgumentException(
+ "Only supports DynamicBucketRow, illegal record type: " +
record.getClass());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 056625863..e3b524b9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -38,7 +38,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
* @param <T> type of record to write into {@link FileStore}.
*/
public class TableWriteImpl<T>
- implements InnerTableWrite,
Restorable<List<AbstractFileStoreWrite.State>> {
+ implements InnerTableWrite,
Restorable<List<AbstractFileStoreWrite.State<T>>> {
private final AbstractFileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
@@ -146,12 +146,12 @@ public class TableWriteImpl<T>
}
@Override
- public List<AbstractFileStoreWrite.State> checkpoint() {
+ public List<AbstractFileStoreWrite.State<T>> checkpoint() {
return write.checkpoint();
}
@Override
- public void restore(List<AbstractFileStoreWrite.State> state) {
+ public void restore(List<AbstractFileStoreWrite.State<T>> state) {
write.restore(state);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
new file mode 100644
index 000000000..4b10d329d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.UUID;
+
+/** Base class to test catalog primary key table. */
+public abstract class PrimaryKeyTableTestBase {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ protected AbstractFileStoreTable table;
+ protected String commitUser;
+
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context = CatalogContext.create(new
Path(tempPath.toUri()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (AbstractFileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ protected Options tableOptions() {
+ return new Options();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
new file mode 100644
index 000000000..99eff3756
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.NewFilesIncrement;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableCommit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HashBucketAssigner}. */
+public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
+
+ private IndexFileHandler fileHandler;
+ private StreamTableCommit commit;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ fileHandler = table.store().newIndexFileHandler();
+ commit =
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
+ }
+
+ private HashBucketAssigner createAssigner(int numAssigners, int assignId) {
+ return new HashBucketAssigner(
+ table.snapshotManager(), commitUser, fileHandler,
numAssigners, assignId, 5);
+ }
+
+ @Test
+ public void testAssign() {
+ HashBucketAssigner assigner = createAssigner(2, 0);
+
+ // assign
+ assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 4)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 8)).isEqualTo(0);
+
+ // full
+ assertThat(assigner.assign(row(1), 10)).isEqualTo(2);
+
+ // another partition
+ assertThat(assigner.assign(row(2), 12)).isEqualTo(0);
+
+ // read assigned
+ assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+
+ // not mine
+ assertThatThrownBy(() -> assigner.assign(row(1), 1))
+ .hasMessageContaining("This is a bug, record assign id");
+ }
+
+ private CommitMessage createCommitMessage(BinaryRow partition, int bucket,
IndexFileMeta file) {
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ new NewFilesIncrement(Collections.emptyList(),
Collections.emptyList()),
+ new CompactIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
+ new IndexIncrement(Collections.singletonList(file)));
+ }
+
+ @Test
+ public void testAssignRestore() {
+ IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {0, 2});
+ IndexFileMeta bucket1 = fileHandler.writeHashIndex(new int[] {3, 5});
+ commit.commit(
+ 0,
+ Arrays.asList(
+ createCommitMessage(row(1), 0, bucket0),
+ createCommitMessage(row(1), 1, bucket1)));
+
+ HashBucketAssigner assigner0 = createAssigner(3, 0);
+ HashBucketAssigner assigner2 = createAssigner(3, 2);
+
+ // read assigned
+ assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 3)).isEqualTo(1);
+ assertThat(assigner2.assign(row(1), 5)).isEqualTo(1);
+
+ // new assign
+ assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 9)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 12)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 15)).isEqualTo(3);
+ }
+
+ @Test
+ public void testIndexEliminate() {
+ HashBucketAssigner assigner = createAssigner(1, 0);
+
+ // checkpoint 0
+ assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+ assertThat(assigner.assign(row(2), 0)).isEqualTo(0);
+ assigner.prepareCommit(0);
+ commit.commit(
+ 0,
+ Arrays.asList(
+ createCommitMessage(row(1), 0,
fileHandler.writeHashIndex(new int[] {0})),
+ createCommitMessage(row(2), 0,
fileHandler.writeHashIndex(new int[] {0}))));
+
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1),
row(2));
+
+ // checkpoint 1, but no commit
+ assertThat(assigner.assign(row(1), 1)).isEqualTo(0);
+ assigner.prepareCommit(1);
+
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1));
+
+ // checkpoint 2
+ assigner.prepareCommit(2);
+
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1));
+
+ // checkpoint 3 and commit checkpoint 1
+ commit.commit(
+ 1,
+ Collections.singletonList(
+ createCommitMessage(row(1), 0,
fileHandler.writeHashIndex(new int[] {1}))));
+ assigner.prepareCommit(3);
+ assertThat(assigner.currentPartitions()).isEmpty();
+ }
+
+ @Test
+ public void testFileSize() {
+ List<Integer> ints = new ArrayList<>();
+ Random rnd = new Random();
+ for (int i = 0; i < 10_000_000; i++) {
+ ints.add(rnd.nextInt());
+ }
+
+ IndexFileMeta file =
+
fileHandler.writeHashIndex(ints.stream().mapToInt(Integer::intValue).toArray());
+ System.out.println(file.fileName());
+ System.out.println(file.fileSize());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
new file mode 100644
index 000000000..8c646457e
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HashIndexMaintainer}. */
+public class HashIndexMaintainerTest extends PrimaryKeyTableTestBase {
+
+ private IndexFileHandler fileHandler;
+ private StreamWriteBuilder writeBuilder;
+ private StreamTableWrite write;
+ private StreamTableCommit commit;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ fileHandler = table.store().newIndexFileHandler();
+ writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+ }
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, -1);
+ return options;
+ }
+
+ private DynamicBucketRow createRow(int partition, int bucket, int key, int
value) {
+ return new DynamicBucketRow(GenericRow.of(partition, key, value),
bucket);
+ }
+
+ private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage>
messages) {
+ Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
+ for (CommitMessage commitMessage : messages) {
+ CommitMessageImpl message = (CommitMessageImpl) commitMessage;
+ List<IndexFileMeta> files =
message.indexIncrement().newIndexFiles();
+ if (files.isEmpty()) {
+ continue;
+ }
+ int[] ints =
+ fileHandler.readHashIndexList(files.get(0)).stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
+ index.computeIfAbsent(message.partition(), k -> new HashMap<>())
+ .put(message.bucket(), ints);
+ }
+ return index;
+ }
+
+ @Test
+ public void testAssignBucket() throws Exception {
+ assertThatThrownBy(() -> write.write(GenericRow.of(1, 1, 1)))
+ .hasMessageContaining("Only supports DynamicBucketRow");
+
+ // commit two partitions
+ write.write(createRow(1, 1, 1, 1));
+ write.write(createRow(2, 2, 2, 2));
+ List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+ Map<BinaryRow, Map<Integer, int[]>> index = readIndex(commitMessages);
+ assertThat(index).containsOnlyKeys(row(1), row(2));
+ assertThat(index.get(row(1))).containsOnlyKeys(1);
+
assertThat(index.get(row(1)).get(1)).containsExactlyInAnyOrder(1465514398);
+
assertThat(index.get(row(2)).get(2)).containsExactlyInAnyOrder(1340390384);
+ commit.commit(0, commitMessages);
+
+ // only one partition
+ write.write(createRow(1, 1, 2, 2));
+ commitMessages = write.prepareCommit(true, 1);
+ index = readIndex(commitMessages);
+ assertThat(index).containsOnlyKeys(row(1));
+ assertThat(index.get(row(1))).containsOnlyKeys(1);
+
assertThat(index.get(row(1)).get(1)).containsExactlyInAnyOrder(1340390384,
1465514398);
+ commit.commit(1, commitMessages);
+
+ // restore
+ write = writeBuilder.newWrite();
+ write.write(createRow(1, 1, 3, 3));
+ commitMessages = write.prepareCommit(true, 2);
+ index = readIndex(commitMessages);
+ assertThat(index).containsOnlyKeys(row(1));
+ assertThat(index.get(row(1))).containsOnlyKeys(1);
+ assertThat(index.get(row(1)).get(1))
+ .containsExactlyInAnyOrder(-771300025, 1340390384, 1465514398);
+
+ write.close();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index 899f96b38..aec89ab4c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -127,10 +127,11 @@ public class TableWriteTest {
commitId++;
break;
case EXTRACT_STATE:
- List<AbstractFileStoreWrite.State> state =
write.checkpoint();
+ List<? extends AbstractFileStoreWrite.State<?>>
state =
+ write.checkpoint();
write.close();
write = table.newWrite(commitUser);
- write.restore(state);
+ write.restore((List) state);
break;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 344413c46..3b2baf0b3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -151,9 +151,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
return;
}
- List<AbstractFileStoreWrite.State> states = write.checkpoint();
+ List<? extends AbstractFileStoreWrite.State<?>> states =
write.checkpoint();
write.close();
write = newTableWrite(newTable);
- write.restore(states);
+ write.restore((List) states);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 24f0b7164..3106ed679 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -185,6 +185,7 @@ public class TestChangelogDataReadWrite {
pathFactory,
snapshotManager,
null, // not used, we only create an empty
writer
+ null,
options,
EXTRACTOR)
.createWriterContainer(partition, bucket, true)