This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6191c3349 [core] Introduce IndexMaintainer to FileStoreWrite and 
HashBucketAssigner (#1354)
6191c3349 is described below

commit 6191c3349514158434d0705e44b5c4eba1490c78
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 12 17:42:46 2023 +0800

    [core] Introduce IndexMaintainer to FileStoreWrite and HashBucketAssigner 
(#1354)
---
 .../java/org/apache/paimon/KeyValueFileStore.java  |   7 +
 .../apache/paimon/index/HashBucketAssigner.java    | 227 +++++++++++++++++++++
 .../apache/paimon/index/HashIndexMaintainer.java   | 109 ++++++++++
 .../org/apache/paimon/index/IndexMaintainer.java   |  36 ++++
 .../paimon/operation/AbstractFileStoreWrite.java   | 100 +++++----
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   2 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   4 +-
 .../paimon/operation/MemoryFileStoreWrite.java     |   8 +-
 .../paimon/table/AbstractFileStoreTable.java       |   5 +-
 .../apache/paimon/table/sink/DynamicBucketRow.java | 133 ++++++++++++
 .../table/sink/DynamicBucketRowKeyExtractor.java   |  50 +++++
 .../apache/paimon/table/sink/TableWriteImpl.java   |   6 +-
 .../paimon/catalog/PrimaryKeyTableTestBase.java    |  63 ++++++
 .../paimon/index/HashBucketAssignerTest.java       | 167 +++++++++++++++
 .../paimon/index/HashIndexMaintainerTest.java      | 126 ++++++++++++
 .../apache/paimon/table/sink/TableWriteTest.java   |   5 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |   4 +-
 .../flink/source/TestChangelogDataReadWrite.java   |   1 +
 18 files changed, 996 insertions(+), 57 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 9ee654e23..ad0ad67dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -22,6 +22,8 @@ import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.KeyValueFileStoreRead;
@@ -110,6 +112,10 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreWrite newWrite(String commitUser, 
ManifestCacheFilter manifestFilter) {
+        IndexMaintainer.Factory<KeyValue> indexFactory = null;
+        if (bucketMode() == BucketMode.DYNAMIC) {
+            indexFactory = new 
HashIndexMaintainer.Factory(newIndexFileHandler());
+        }
         return new KeyValueFileStoreWrite(
                 fileIO,
                 schemaManager,
@@ -123,6 +129,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 pathFactory(),
                 snapshotManager(),
                 newScan(true).withManifestCacheFilter(manifestFilter),
+                indexFactory,
                 options,
                 keyValueFieldsExtractor);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
new file mode 100644
index 000000000..aa3336b04
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.utils.Int2ShortHashMap;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Assign bucket for key hashcode. */
+public class HashBucketAssigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HashBucketAssigner.class);
+
+    private final SnapshotManager snapshotManager;
+    private final String commitUser;
+    private final IndexFileHandler indexFileHandler;
+    private final int numAssigners;
+    private final int assignId;
+    private final long targetBucketRowNumber;
+
+    private final Map<BinaryRow, PartitionIndex> partitionIndex;
+
+    public HashBucketAssigner(
+            SnapshotManager snapshotManager,
+            String commitUser,
+            IndexFileHandler indexFileHandler,
+            int numAssigners,
+            int assignId,
+            long targetBucketRowNumber) {
+        this.snapshotManager = snapshotManager;
+        this.commitUser = commitUser;
+        this.indexFileHandler = indexFileHandler;
+        this.numAssigners = numAssigners;
+        this.assignId = assignId;
+        this.targetBucketRowNumber = targetBucketRowNumber;
+        this.partitionIndex = new HashMap<>();
+    }
+
+    /** Assign a bucket for key hash of a record. */
+    public int assign(BinaryRow partition, int hash) {
+        int recordAssignId = computeAssignId(hash);
+        checkArgument(
+                recordAssignId == assignId,
+                "This is a bug, record assign id %s should equal to assign id 
%s.",
+                recordAssignId,
+                assignId);
+
+        // 1. is it a key that has appeared before
+        PartitionIndex index = partitionIndex.computeIfAbsent(partition, 
this::loadIndex);
+        index.accessed = true;
+        Int2ShortHashMap hash2Bucket = index.hash2Bucket;
+        if (hash2Bucket.containsKey(hash)) {
+            return hash2Bucket.get(hash);
+        }
+
+        // 2. find bucket from existing buckets
+        Map<Integer, Long> buckets = index.bucketInformation;
+        for (Integer bucket : buckets.keySet()) {
+            if (computeAssignId(bucket) == assignId) {
+                // it is my bucket
+                Long number = buckets.get(bucket);
+                if (number < targetBucketRowNumber) {
+                    buckets.put(bucket, number + 1);
+                    hash2Bucket.put(hash, bucket.shortValue());
+                    return bucket;
+                }
+            }
+        }
+
+        // 3. create a new bucket
+        for (int i = 0; i < Short.MAX_VALUE; i++) {
+            if (computeAssignId(i) == assignId && !buckets.containsKey(i)) {
+                hash2Bucket.put(hash, (short) i);
+                buckets.put(i, 1L);
+                return i;
+            }
+        }
+
+        @SuppressWarnings("OptionalGetWithoutIsPresent")
+        int maxBucket = 
buckets.keySet().stream().mapToInt(Integer::intValue).max().getAsInt();
+        throw new RuntimeException(
+                String.format(
+                        "To more bucket %s, you should increase target bucket 
row number %s.",
+                        maxBucket, targetBucketRowNumber));
+    }
+
+    /** Prepare commit to clear outdated partition index. */
+    public void prepareCommit(long commitIdentifier) {
+        long latestCommittedIdentifier;
+        if (partitionIndex.values().stream()
+                        .mapToLong(i -> i.lastAccessedCommitIdentifier)
+                        .max()
+                        .orElse(Long.MIN_VALUE)
+                == Long.MIN_VALUE) {
+            // Optimization for the first commit.
+            //
+            // If this is the first commit, no index has previous modified 
commit, so the value of
+            // `latestCommittedIdentifier` does not matter.
+            //
+            // Without this optimization, we may need to scan through all 
snapshots only to find
+            // that there is no previous snapshot by this user, which is very 
inefficient.
+            latestCommittedIdentifier = Long.MIN_VALUE;
+        } else {
+            latestCommittedIdentifier =
+                    snapshotManager
+                            .latestSnapshotOfUser(commitUser)
+                            .map(Snapshot::commitIdentifier)
+                            .orElse(Long.MIN_VALUE);
+        }
+
+        Iterator<Map.Entry<BinaryRow, PartitionIndex>> iterator =
+                partitionIndex.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<BinaryRow, PartitionIndex> entry = iterator.next();
+            BinaryRow partition = entry.getKey();
+            PartitionIndex index = entry.getValue();
+            if (index.accessed) {
+                index.lastAccessedCommitIdentifier = commitIdentifier;
+            } else {
+                if (index.lastAccessedCommitIdentifier <= 
latestCommittedIdentifier) {
+                    // Clear writer if no update, and if its latest 
modification has committed.
+                    //
+                    // We need a mechanism to clear index, otherwise there 
will be more and
+                    // more such as yesterday's partition that no longer needs 
to be accessed.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Removing index for partition {}. "
+                                        + "Index's last accessed identifier is 
{}, "
+                                        + "while latest committed identifier 
is {}",
+                                partition,
+                                index.lastAccessedCommitIdentifier,
+                                latestCommittedIdentifier);
+                    }
+                    iterator.remove();
+                }
+            }
+            index.accessed = false;
+        }
+    }
+
+    @VisibleForTesting
+    Set<BinaryRow> currentPartitions() {
+        return partitionIndex.keySet();
+    }
+
+    private int computeAssignId(int hash) {
+        return Math.abs(hash % numAssigners);
+    }
+
+    private PartitionIndex loadIndex(BinaryRow partition) {
+        Int2ShortHashMap map = new Int2ShortHashMap();
+        List<IndexManifestEntry> files = indexFileHandler.scan(HASH_INDEX, 
partition);
+        Map<Integer, Long> buckets = new HashMap<>();
+        for (IndexManifestEntry file : files) {
+            try (IntIterator iterator = 
indexFileHandler.readHashIndex(file.indexFile())) {
+                while (true) {
+                    try {
+                        int hash = iterator.next();
+                        if (computeAssignId(hash) == assignId) {
+                            map.put(hash, (short) file.bucket());
+                        }
+                        buckets.compute(
+                                file.bucket(), (bucket, number) -> number == 
null ? 1 : number + 1);
+                    } catch (EOFException ignored) {
+                        break;
+                    }
+                }
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+        return new PartitionIndex(map, buckets);
+    }
+
+    private static class PartitionIndex {
+
+        private final Int2ShortHashMap hash2Bucket;
+
+        private final Map<Integer, Long> bucketInformation;
+
+        private boolean accessed;
+
+        private long lastAccessedCommitIdentifier;
+
+        private PartitionIndex(Int2ShortHashMap hash2Bucket, Map<Integer, 
Long> bucketInformation) {
+            this.hash2Bucket = hash2Bucket;
+            this.bucketInformation = bucketInformation;
+            this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
+            this.accessed = true;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
new file mode 100644
index 000000000..644be637f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.IntHashSet;
+import org.apache.paimon.utils.IntIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** An {@link IndexMaintainer} for dynamic bucket to maintain key hashcode in 
a bucket. */
+public class HashIndexMaintainer implements IndexMaintainer<KeyValue> {
+
+    private final IndexFileHandler fileHandler;
+    private final IntHashSet hashcode;
+
+    private boolean modified;
+
+    private HashIndexMaintainer(
+            IndexFileHandler fileHandler, Long snapshotId, BinaryRow 
partition, int bucket) {
+        this.fileHandler = fileHandler;
+        IntHashSet hashcode = new IntHashSet();
+        if (snapshotId != null) {
+            Optional<IndexFileMeta> indexFile =
+                    fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX, 
partition, bucket);
+            if (indexFile.isPresent()) {
+                IndexFileMeta file = indexFile.get();
+                hashcode = new IntHashSet((int) file.rowCount());
+                restore(fileHandler, hashcode, file);
+            }
+        }
+        this.hashcode = hashcode;
+        this.modified = false;
+    }
+
+    private void restore(IndexFileHandler fileHandler, IntHashSet hashcode, 
IndexFileMeta file) {
+        try (IntIterator iterator = fileHandler.readHashIndex(file)) {
+            while (true) {
+                try {
+                    hashcode.add(iterator.next());
+                } catch (EOFException ignored) {
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public void notifyNewRecord(KeyValue record) {
+        InternalRow key = record.key();
+        if (!(key instanceof BinaryRow)) {
+            throw new IllegalArgumentException("Unsupported key type: " + 
key.getClass());
+        }
+        hashcode.add(key.hashCode());
+        modified = true;
+    }
+
+    @Override
+    public List<IndexFileMeta> prepareCommit() {
+        if (modified) {
+            IndexFileMeta entry =
+                    fileHandler.writeHashIndex(hashcode.size(), 
hashcode.toIntIterator());
+            modified = false;
+            return Collections.singletonList(entry);
+        }
+        return Collections.emptyList();
+    }
+
+    /** Factory to restore {@link HashIndexMaintainer}. */
+    public static class Factory implements IndexMaintainer.Factory<KeyValue> {
+
+        private final IndexFileHandler handler;
+
+        public Factory(IndexFileHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public IndexMaintainer<KeyValue> createOrRestore(
+                Long snapshotId, BinaryRow partition, int bucket) {
+            return new HashIndexMaintainer(handler, snapshotId, partition, 
bucket);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
new file mode 100644
index 000000000..8d61d46f8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.List;
+
+/** Maintainer to maintain index. */
+public interface IndexMaintainer<T> {
+
+    void notifyNewRecord(T record);
+
+    List<IndexFileMeta> prepareCommit();
+
+    /** Factory to restore {@link IndexMaintainer}. */
+    interface Factory<T> {
+        IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow 
partition, int bucket);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 06062800f..093b96fe7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -22,7 +22,10 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -40,7 +43,6 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -54,13 +56,14 @@ import java.util.concurrent.Executors;
  * @param <T> type of record to write.
  */
 public abstract class AbstractFileStoreWrite<T>
-        implements FileStoreWrite<T>, 
Restorable<List<AbstractFileStoreWrite.State>> {
+        implements FileStoreWrite<T>, 
Restorable<List<AbstractFileStoreWrite.State<T>>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFileStoreWrite.class);
 
     private final String commitUser;
-    private final SnapshotManager snapshotManager;
+    protected final SnapshotManager snapshotManager;
     private final FileStoreScan scan;
+    @Nullable private final IndexMaintainer.Factory<T> indexFactory;
 
     @Nullable protected IOManager ioManager;
 
@@ -70,10 +73,14 @@ public abstract class AbstractFileStoreWrite<T>
     private boolean emptyWriter = false;
 
     protected AbstractFileStoreWrite(
-            String commitUser, SnapshotManager snapshotManager, FileStoreScan 
scan) {
+            String commitUser,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            @Nullable IndexMaintainer.Factory<T> indexFactory) {
         this.commitUser = commitUser;
         this.snapshotManager = snapshotManager;
         this.scan = scan;
+        this.indexFactory = indexFactory;
 
         this.writers = new HashMap<>();
     }
@@ -96,8 +103,11 @@ public abstract class AbstractFileStoreWrite<T>
 
     @Override
     public void write(BinaryRow partition, int bucket, T data) throws 
Exception {
-        RecordWriter<T> writer = getWriterWrapper(partition, bucket).writer;
-        writer.write(data);
+        WriterContainer<T> container = getWriterWrapper(partition, bucket);
+        container.writer.write(data);
+        if (container.indexMaintainer != null) {
+            container.indexMaintainer.notifyNewRecord(data);
+        }
     }
 
     @Override
@@ -165,12 +175,17 @@ public abstract class AbstractFileStoreWrite<T>
                 WriterContainer<T> writerContainer = entry.getValue();
 
                 CommitIncrement increment = 
writerContainer.writer.prepareCommit(waitCompaction);
+                List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+                if (writerContainer.indexMaintainer != null) {
+                    newIndexFiles = 
writerContainer.indexMaintainer.prepareCommit();
+                }
                 CommitMessageImpl committable =
                         new CommitMessageImpl(
                                 partition,
                                 bucket,
                                 increment.newFilesIncrement(),
-                                increment.compactIncrement());
+                                increment.compactIncrement(),
+                                new IndexIncrement(newIndexFiles));
                 result.add(committable);
 
                 if (committable.isEmpty()) {
@@ -219,8 +234,8 @@ public abstract class AbstractFileStoreWrite<T>
     }
 
     @Override
-    public List<State> checkpoint() {
-        List<State> result = new ArrayList<>();
+    public List<State<T>> checkpoint() {
+        List<State<T>> result = new ArrayList<>();
 
         for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> 
partitionEntry :
                 writers.entrySet()) {
@@ -245,25 +260,26 @@ public abstract class AbstractFileStoreWrite<T>
                 // compaction result might be updated during prepareCommit
                 Collection<DataFileMeta> dataFiles = 
writerContainer.writer.dataFiles();
                 result.add(
-                        new State(
+                        new State<>(
                                 partition,
                                 bucket,
                                 writerContainer.baseSnapshotId,
                                 writerContainer.lastModifiedCommitIdentifier,
                                 dataFiles,
+                                writerContainer.indexMaintainer,
                                 increment));
             }
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Extracted state " + result.toString());
+            LOG.debug("Extracted state " + result);
         }
         return result;
     }
 
     @Override
-    public void restore(List<State> states) {
-        for (State state : states) {
+    public void restore(List<State<T>> states) {
+        for (State<T> state : states) {
             RecordWriter<T> writer =
                     createWriter(
                             state.partition,
@@ -273,7 +289,7 @@ public abstract class AbstractFileStoreWrite<T>
                             compactExecutor());
             notifyNewWriter(writer);
             WriterContainer<T> writerContainer =
-                    new WriterContainer<>(writer, state.baseSnapshotId);
+                    new WriterContainer<>(writer, state.indexMaintainer, 
state.baseSnapshotId);
             writerContainer.lastModifiedCommitIdentifier = 
state.lastModifiedCommitIdentifier;
             writers.computeIfAbsent(state.partition, k -> new HashMap<>())
                     .put(state.bucket, writerContainer);
@@ -298,38 +314,27 @@ public abstract class AbstractFileStoreWrite<T>
         }
 
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        RecordWriter<T> writer;
-        if (emptyWriter) {
-            writer =
-                    createWriter(
-                            partition.copy(),
-                            bucket,
-                            Collections.emptyList(),
-                            null,
-                            compactExecutor());
-        } else {
-            writer =
-                    createWriter(
-                            partition.copy(),
-                            bucket,
-                            scanExistingFileMetas(latestSnapshotId, partition, 
bucket),
-                            null,
-                            compactExecutor());
+        List<DataFileMeta> restoreFiles = new ArrayList<>();
+        if (!emptyWriter && latestSnapshotId != null) {
+            restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, 
bucket);
         }
+        IndexMaintainer<T> indexMaintainer =
+                indexFactory == null
+                        ? null
+                        : indexFactory.createOrRestore(latestSnapshotId, 
partition, bucket);
+        RecordWriter<T> writer =
+                createWriter(partition.copy(), bucket, restoreFiles, null, 
compactExecutor());
         notifyNewWriter(writer);
-        return new WriterContainer<>(writer, latestSnapshotId);
+        return new WriterContainer<>(writer, indexMaintainer, 
latestSnapshotId);
     }
 
     private List<DataFileMeta> scanExistingFileMetas(
-            Long snapshotId, BinaryRow partition, int bucket) {
+            long snapshotId, BinaryRow partition, int bucket) {
         List<DataFileMeta> existingFileMetas = new ArrayList<>();
-        if (snapshotId != null) {
-            // Concat all the DataFileMeta of existing files into 
existingFileMetas.
-            scan.withSnapshot(snapshotId).withPartitionBucket(partition, 
bucket).plan().files()
-                    .stream()
-                    .map(ManifestEntry::file)
-                    .forEach(existingFileMetas::add);
-        }
+        // Concat all the DataFileMeta of existing files into 
existingFileMetas.
+        scan.withSnapshot(snapshotId).withPartitionBucket(partition, 
bucket).plan().files().stream()
+                .map(ManifestEntry::file)
+                .forEach(existingFileMetas::add);
         return existingFileMetas;
     }
 
@@ -359,11 +364,16 @@ public abstract class AbstractFileStoreWrite<T>
     @VisibleForTesting
     public static class WriterContainer<T> {
         public final RecordWriter<T> writer;
+        @Nullable public final IndexMaintainer<T> indexMaintainer;
         protected final long baseSnapshotId;
         protected long lastModifiedCommitIdentifier;
 
-        protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) 
{
+        protected WriterContainer(
+                RecordWriter<T> writer,
+                @Nullable IndexMaintainer<T> indexMaintainer,
+                Long baseSnapshotId) {
             this.writer = writer;
+            this.indexMaintainer = indexMaintainer;
             this.baseSnapshotId =
                     baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : 
baseSnapshotId;
             this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
@@ -371,13 +381,14 @@ public abstract class AbstractFileStoreWrite<T>
     }
 
     /** Recoverable state of {@link AbstractFileStoreWrite}. */
-    public static class State {
+    public static class State<T> {
         protected final BinaryRow partition;
         protected final int bucket;
 
         protected final long baseSnapshotId;
         protected final long lastModifiedCommitIdentifier;
         protected final List<DataFileMeta> dataFiles;
+        @Nullable protected final IndexMaintainer<T> indexMaintainer;
         protected final CommitIncrement commitIncrement;
 
         protected State(
@@ -386,24 +397,27 @@ public abstract class AbstractFileStoreWrite<T>
                 long baseSnapshotId,
                 long lastModifiedCommitIdentifier,
                 Collection<DataFileMeta> dataFiles,
+                @Nullable IndexMaintainer<T> indexMaintainer,
                 CommitIncrement commitIncrement) {
             this.partition = partition;
             this.bucket = bucket;
             this.baseSnapshotId = baseSnapshotId;
             this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
             this.dataFiles = new ArrayList<>(dataFiles);
+            this.indexMaintainer = indexMaintainer;
             this.commitIncrement = commitIncrement;
         }
 
         @Override
         public String toString() {
             return String.format(
-                    "{%s, %d, %d, %d, %s, %s}",
+                    "{%s, %d, %d, %d, %s, %s, %s}",
                     partition,
                     bucket,
                     baseSnapshotId,
                     lastModifiedCommitIdentifier,
                     dataFiles,
+                    indexMaintainer,
                     commitIncrement);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 4cae8adce..fdfcb21c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -76,7 +76,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options) {
-        super(commitUser, snapshotManager, scan);
+        super(commitUser, snapshotManager, scan, null);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index be6ca924b..86abcc947 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -92,9 +93,10 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
+            @Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
             CoreOptions options,
             KeyValueFieldsExtractor extractor) {
-        super(commitUser, snapshotManager, scan, options);
+        super(commitUser, snapshotManager, scan, options, indexFactory);
         this.fileIO = fileIO;
         this.keyType = keyType;
         this.valueType = valueType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 40b8da562..6268eedd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
@@ -32,6 +33,8 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Iterator;
 import java.util.Map;
 
@@ -54,8 +57,9 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             String commitUser,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            CoreOptions options) {
-        super(commitUser, snapshotManager, scan);
+            CoreOptions options,
+            @Nullable IndexMaintainer.Factory<T> indexFactory) {
+        super(commitUser, snapshotManager, scan, indexFactory);
         this.options = options;
         this.cacheManager =
                 new CacheManager(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index db85d12b2..29da09274 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -89,9 +90,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
             case FIXED:
                 return new FixedBucketRowKeyExtractor(schema());
             case DYNAMIC:
-                // todo: dynamic bucket table need to check current bucket, 
when full, return the
-                // next one.
-                throw new UnsupportedOperationException("Not supported yet.");
+                return new DynamicBucketRowKeyExtractor(schema());
             case UNAWARE:
                 return new UnawareBucketRowKeyExtractor(schema());
             default:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
new file mode 100644
index 000000000..73394a9f5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/** An {@link InternalRow} wraps another {@link InternalRow} with bucket. */
+public class DynamicBucketRow implements InternalRow {
+
+    private final InternalRow row;
+    private final int bucket;
+
+    public DynamicBucketRow(InternalRow row, int bucket) {
+        this.row = row;
+        this.bucket = bucket;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return row.getFieldCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        row.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(pos);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return row.getString(pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(pos, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        return row.getTimestamp(pos, precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return row.getArray(pos);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return row.getMap(pos);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return row.getRow(pos, numFields);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
new file mode 100644
index 000000000..6ae14b5f7
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/DynamicBucketRowKeyExtractor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * {@link KeyAndBucketExtractor} for {@link InternalRow} with extracting 
bucket from {@link
+ * DynamicBucketRow}.
+ */
+public class DynamicBucketRowKeyExtractor extends RowKeyExtractor {
+
+    public DynamicBucketRowKeyExtractor(TableSchema schema) {
+        super(schema);
+        int numBuckets = new CoreOptions(schema.options()).bucket();
+        checkArgument(
+                numBuckets == -1,
+                "Only 'bucket' = '-1' is allowed for 
'DynamicBucketRowKeyExtractor', but found: "
+                        + numBuckets);
+    }
+
+    @Override
+    public int bucket() {
+        if (record instanceof DynamicBucketRow) {
+            return ((DynamicBucketRow) record).bucket();
+        }
+        throw new IllegalArgumentException(
+                "Only supports DynamicBucketRow, illegal record type: " + 
record.getClass());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 056625863..e3b524b9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -38,7 +38,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  * @param <T> type of record to write into {@link FileStore}.
  */
 public class TableWriteImpl<T>
-        implements InnerTableWrite, 
Restorable<List<AbstractFileStoreWrite.State>> {
+        implements InnerTableWrite, 
Restorable<List<AbstractFileStoreWrite.State<T>>> {
 
     private final AbstractFileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
@@ -146,12 +146,12 @@ public class TableWriteImpl<T>
     }
 
     @Override
-    public List<AbstractFileStoreWrite.State> checkpoint() {
+    public List<AbstractFileStoreWrite.State<T>> checkpoint() {
         return write.checkpoint();
     }
 
     @Override
-    public void restore(List<AbstractFileStoreWrite.State> state) {
+    public void restore(List<AbstractFileStoreWrite.State<T>> state) {
         write.restore(state);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
new file mode 100644
index 000000000..4b10d329d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/PrimaryKeyTableTestBase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.UUID;
+
+/** Base class to test catalog primary key table. */
+public abstract class PrimaryKeyTableTestBase {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    protected AbstractFileStoreTable table;
+    protected String commitUser;
+
+    @BeforeEach
+    public void beforeEachBase() throws Exception {
+        CatalogContext context = CatalogContext.create(new 
Path(tempPath.toUri()));
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        Identifier identifier = new Identifier("default", "T");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .options(tableOptions().toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        table = (AbstractFileStoreTable) catalog.getTable(identifier);
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    protected Options tableOptions() {
+        return new Options();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
new file mode 100644
index 000000000..99eff3756
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.NewFilesIncrement;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.StreamTableCommit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HashBucketAssigner}. */
+public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
+
+    private IndexFileHandler fileHandler;
+    private StreamTableCommit commit;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        fileHandler = table.store().newIndexFileHandler();
+        commit = 
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
+    }
+
+    private HashBucketAssigner createAssigner(int numAssigners, int assignId) {
+        return new HashBucketAssigner(
+                table.snapshotManager(), commitUser, fileHandler, 
numAssigners, assignId, 5);
+    }
+
+    @Test
+    public void testAssign() {
+        HashBucketAssigner assigner = createAssigner(2, 0);
+
+        // assign
+        assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 4)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 8)).isEqualTo(0);
+
+        // full
+        assertThat(assigner.assign(row(1), 10)).isEqualTo(2);
+
+        // another partition
+        assertThat(assigner.assign(row(2), 12)).isEqualTo(0);
+
+        // read assigned
+        assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+
+        // not mine
+        assertThatThrownBy(() -> assigner.assign(row(1), 1))
+                .hasMessageContaining("This is a bug, record assign id");
+    }
+
+    private CommitMessage createCommitMessage(BinaryRow partition, int bucket, 
IndexFileMeta file) {
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                new NewFilesIncrement(Collections.emptyList(), 
Collections.emptyList()),
+                new CompactIncrement(
+                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+                new IndexIncrement(Collections.singletonList(file)));
+    }
+
+    @Test
+    public void testAssignRestore() {
+        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {0, 2});
+        IndexFileMeta bucket1 = fileHandler.writeHashIndex(new int[] {3, 5});
+        commit.commit(
+                0,
+                Arrays.asList(
+                        createCommitMessage(row(1), 0, bucket0),
+                        createCommitMessage(row(1), 1, bucket1)));
+
+        HashBucketAssigner assigner0 = createAssigner(3, 0);
+        HashBucketAssigner assigner2 = createAssigner(3, 2);
+
+        // read assigned
+        assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 3)).isEqualTo(1);
+        assertThat(assigner2.assign(row(1), 5)).isEqualTo(1);
+
+        // new assign
+        assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 9)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 12)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 15)).isEqualTo(3);
+    }
+
+    @Test
+    public void testIndexEliminate() {
+        HashBucketAssigner assigner = createAssigner(1, 0);
+
+        // checkpoint 0
+        assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+        assertThat(assigner.assign(row(2), 0)).isEqualTo(0);
+        assigner.prepareCommit(0);
+        commit.commit(
+                0,
+                Arrays.asList(
+                        createCommitMessage(row(1), 0, 
fileHandler.writeHashIndex(new int[] {0})),
+                        createCommitMessage(row(2), 0, 
fileHandler.writeHashIndex(new int[] {0}))));
+        
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1), 
row(2));
+
+        // checkpoint 1, but no commit
+        assertThat(assigner.assign(row(1), 1)).isEqualTo(0);
+        assigner.prepareCommit(1);
+        
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1));
+
+        // checkpoint 2
+        assigner.prepareCommit(2);
+        
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1));
+
+        // checkpoint 3 and commit checkpoint 1
+        commit.commit(
+                1,
+                Collections.singletonList(
+                        createCommitMessage(row(1), 0, 
fileHandler.writeHashIndex(new int[] {1}))));
+        assigner.prepareCommit(3);
+        assertThat(assigner.currentPartitions()).isEmpty();
+    }
+
+    @Test
+    public void testFileSize() {
+        List<Integer> ints = new ArrayList<>();
+        Random rnd = new Random();
+        for (int i = 0; i < 10_000_000; i++) {
+            ints.add(rnd.nextInt());
+        }
+
+        IndexFileMeta file =
+                
fileHandler.writeHashIndex(ints.stream().mapToInt(Integer::intValue).toArray());
+        System.out.println(file.fileName());
+        System.out.println(file.fileSize());
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
new file mode 100644
index 000000000..8c646457e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexMaintainerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HashIndexMaintainer}. */
+public class HashIndexMaintainerTest extends PrimaryKeyTableTestBase {
+
+    private IndexFileHandler fileHandler;
+    private StreamWriteBuilder writeBuilder;
+    private StreamTableWrite write;
+    private StreamTableCommit commit;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        fileHandler = table.store().newIndexFileHandler();
+        writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+    }
+
+    @Override
+    protected Options tableOptions() {
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, -1);
+        return options;
+    }
+
+    private DynamicBucketRow createRow(int partition, int bucket, int key, int 
value) {
+        return new DynamicBucketRow(GenericRow.of(partition, key, value), 
bucket);
+    }
+
+    private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> 
messages) {
+        Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
+        for (CommitMessage commitMessage : messages) {
+            CommitMessageImpl message = (CommitMessageImpl) commitMessage;
+            List<IndexFileMeta> files = 
message.indexIncrement().newIndexFiles();
+            if (files.isEmpty()) {
+                continue;
+            }
+            int[] ints =
+                    fileHandler.readHashIndexList(files.get(0)).stream()
+                            .mapToInt(Integer::intValue)
+                            .toArray();
+            index.computeIfAbsent(message.partition(), k -> new HashMap<>())
+                    .put(message.bucket(), ints);
+        }
+        return index;
+    }
+
+    @Test
+    public void testAssignBucket() throws Exception {
+        assertThatThrownBy(() -> write.write(GenericRow.of(1, 1, 1)))
+                .hasMessageContaining("Only supports DynamicBucketRow");
+
+        // commit two partitions
+        write.write(createRow(1, 1, 1, 1));
+        write.write(createRow(2, 2, 2, 2));
+        List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+        Map<BinaryRow, Map<Integer, int[]>> index = readIndex(commitMessages);
+        assertThat(index).containsOnlyKeys(row(1), row(2));
+        assertThat(index.get(row(1))).containsOnlyKeys(1);
+        
assertThat(index.get(row(1)).get(1)).containsExactlyInAnyOrder(1465514398);
+        
assertThat(index.get(row(2)).get(2)).containsExactlyInAnyOrder(1340390384);
+        commit.commit(0, commitMessages);
+
+        // only one partition
+        write.write(createRow(1, 1, 2, 2));
+        commitMessages = write.prepareCommit(true, 1);
+        index = readIndex(commitMessages);
+        assertThat(index).containsOnlyKeys(row(1));
+        assertThat(index.get(row(1))).containsOnlyKeys(1);
+        
assertThat(index.get(row(1)).get(1)).containsExactlyInAnyOrder(1340390384, 
1465514398);
+        commit.commit(1, commitMessages);
+
+        // restore
+        write = writeBuilder.newWrite();
+        write.write(createRow(1, 1, 3, 3));
+        commitMessages = write.prepareCommit(true, 2);
+        index = readIndex(commitMessages);
+        assertThat(index).containsOnlyKeys(row(1));
+        assertThat(index.get(row(1))).containsOnlyKeys(1);
+        assertThat(index.get(row(1)).get(1))
+                .containsExactlyInAnyOrder(-771300025, 1340390384, 1465514398);
+
+        write.close();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
index 899f96b38..aec89ab4c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableWriteTest.java
@@ -127,10 +127,11 @@ public class TableWriteTest {
                             commitId++;
                             break;
                         case EXTRACT_STATE:
-                            List<AbstractFileStoreWrite.State> state = 
write.checkpoint();
+                            List<? extends AbstractFileStoreWrite.State<?>> 
state =
+                                    write.checkpoint();
                             write.close();
                             write = table.newWrite(commitUser);
-                            write.restore(state);
+                            write.restore((List) state);
                             break;
                     }
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 344413c46..3b2baf0b3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -151,9 +151,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             return;
         }
 
-        List<AbstractFileStoreWrite.State> states = write.checkpoint();
+        List<? extends AbstractFileStoreWrite.State<?>> states = 
write.checkpoint();
         write.close();
         write = newTableWrite(newTable);
-        write.restore(states);
+        write.restore((List) states);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 24f0b7164..3106ed679 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -185,6 +185,7 @@ public class TestChangelogDataReadWrite {
                                 pathFactory,
                                 snapshotManager,
                                 null, // not used, we only create an empty 
writer
+                                null,
                                 options,
                                 EXTRACTOR)
                         .createWriterContainer(partition, bucket, true)

Reply via email to