This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cef2fc18d [flink] Introduce zorder/order sort compact for dynamic 
bucket table (#2100)
cef2fc18d is described below

commit cef2fc18d3ceab949ee05ee56422a6bc0781190c
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 10 11:03:36 2023 +0800

    [flink] Introduce zorder/order sort compact for dynamic bucket table (#2100)
---
 .../{IndexMaintainer.java => BucketAssigner.java}  |  15 +-
 .../apache/paimon/index/HashBucketAssigner.java    |   4 +-
 .../apache/paimon/index/HashIndexMaintainer.java   |  15 +-
 .../org/apache/paimon/index/IndexMaintainer.java   |   5 +-
 .../paimon/index/SimpleHashBucketAssigner.java     |  93 +++++++++
 .../paimon/operation/AbstractFileStoreWrite.java   |   3 +-
 .../paimon/index/SimpleHashBucketAssignerTest.java |  69 +++++++
 .../paimon/table/DynamicBucketTableTest.java       | 109 +++++++++++
 .../org/apache/paimon/table/TableTestBase.java     |   1 -
 .../paimon/flink/action/SortCompactAction.java     |  16 +-
 .../flink/sink/DynamicBucketCompactSink.java       |  61 ++++++
 .../paimon/flink/sink/DynamicBucketSink.java       |   8 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |  17 +-
 .../flink/sink/HashBucketAssignerOperator.java     |  28 ++-
 .../paimon/flink/DynamicBucketTableITCase.java     |  40 ++++
 .../SortCompactActionForDynamicBucketITCase.java   | 215 +++++++++++++++++++++
 ...> SortCompactActionForUnawareBucketITCase.java} |  18 +-
 17 files changed, 664 insertions(+), 53 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
similarity index 71%
copy from paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
copy to paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
index 8d61d46f8..f87d6a16f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
@@ -20,17 +20,10 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.data.BinaryRow;
 
-import java.util.List;
+/** Assigner a bucket for a record, just used in dynamic bucket table. */
+public interface BucketAssigner {
 
-/** Maintainer to maintain index. */
-public interface IndexMaintainer<T> {
+    int assign(BinaryRow partition, int hash);
 
-    void notifyNewRecord(T record);
-
-    List<IndexFileMeta> prepareCommit();
-
-    /** Factory to restore {@link IndexMaintainer}. */
-    interface Factory<T> {
-        IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow 
partition, int bucket);
-    }
+    void prepareCommit(long commitIdentifier);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 329d3b971..519a495b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -34,7 +34,7 @@ import java.util.Set;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Assign bucket for key hashcode. */
-public class HashBucketAssigner {
+public class HashBucketAssigner implements BucketAssigner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HashBucketAssigner.class);
 
@@ -64,6 +64,7 @@ public class HashBucketAssigner {
     }
 
     /** Assign a bucket for key hash of a record. */
+    @Override
     public int assign(BinaryRow partition, int hash) {
         int recordAssignId = computeAssignId(hash);
         checkArgument(
@@ -88,6 +89,7 @@ public class HashBucketAssigner {
     }
 
     /** Prepare commit to clear outdated partition index. */
+    @Override
     public void prepareCommit(long commitIdentifier) {
         long latestCommittedIdentifier;
         if (partitionIndex.values().stream()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
index 998c72399..66a8d6409 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -19,11 +19,14 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.utils.IntHashSet;
 import org.apache.paimon.utils.IntIterator;
 
+import javax.annotation.Nullable;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -40,7 +43,10 @@ public class HashIndexMaintainer implements 
IndexMaintainer<KeyValue> {
     private boolean modified;
 
     private HashIndexMaintainer(
-            IndexFileHandler fileHandler, Long snapshotId, BinaryRow 
partition, int bucket) {
+            IndexFileHandler fileHandler,
+            @Nullable Long snapshotId,
+            BinaryRow partition,
+            int bucket) {
         this.fileHandler = fileHandler;
         IntHashSet hashcode = new IntHashSet();
         if (snapshotId != null) {
@@ -93,6 +99,11 @@ public class HashIndexMaintainer implements 
IndexMaintainer<KeyValue> {
         return Collections.emptyList();
     }
 
+    @VisibleForTesting
+    public boolean isEmpty() {
+        return hashcode.size() == 0;
+    }
+
     /** Factory to restore {@link HashIndexMaintainer}. */
     public static class Factory implements IndexMaintainer.Factory<KeyValue> {
 
@@ -104,7 +115,7 @@ public class HashIndexMaintainer implements 
IndexMaintainer<KeyValue> {
 
         @Override
         public IndexMaintainer<KeyValue> createOrRestore(
-                Long snapshotId, BinaryRow partition, int bucket) {
+                @Nullable Long snapshotId, BinaryRow partition, int bucket) {
             return new HashIndexMaintainer(handler, snapshotId, partition, 
bucket);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
index 8d61d46f8..ba881d614 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
@@ -20,6 +20,8 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.data.BinaryRow;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** Maintainer to maintain index. */
@@ -31,6 +33,7 @@ public interface IndexMaintainer<T> {
 
     /** Factory to restore {@link IndexMaintainer}. */
     interface Factory<T> {
-        IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow 
partition, int bucket);
+        IndexMaintainer<T> createOrRestore(
+                @Nullable Long snapshotId, BinaryRow partition, int bucket);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
new file mode 100644
index 000000000..7094684fc
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.Int2ShortHashMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** When we need to overwrite the table, we should use this to avoid loading 
index. */
+public class SimpleHashBucketAssigner implements BucketAssigner {
+
+    private final int numAssigners;
+    private final int assignId;
+    private final long targetBucketRowNumber;
+
+    private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;
+
+    public SimpleHashBucketAssigner(int numAssigners, int assignId, long 
targetBucketRowNumber) {
+        this.numAssigners = numAssigners;
+        this.assignId = assignId;
+        this.targetBucketRowNumber = targetBucketRowNumber;
+        this.partitionIndex = new HashMap<>();
+    }
+
+    @Override
+    public int assign(BinaryRow partition, int hash) {
+        SimplePartitionIndex index =
+                this.partitionIndex.computeIfAbsent(partition, p -> new 
SimplePartitionIndex());
+        return index.assign(hash);
+    }
+
+    @Override
+    public void prepareCommit(long commitIdentifier) {
+        // do nothing
+    }
+
+    /** Simple partition bucket hash assigner. */
+    private class SimplePartitionIndex {
+
+        public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap();
+        private final Map<Integer, Long> bucketInformation;
+        private int currentBucket;
+
+        private SimplePartitionIndex() {
+            bucketInformation = new HashMap<>();
+            loadNewBucket();
+        }
+
+        public int assign(int hash) {
+            // the same hash should go into the same bucket
+            if (hash2Bucket.containsKey(hash)) {
+                return hash2Bucket.get(hash);
+            }
+
+            Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 
0L);
+            if (num >= targetBucketRowNumber) {
+                loadNewBucket();
+            }
+            bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L 
: l + 1);
+            hash2Bucket.put(hash, (short) currentBucket);
+            return currentBucket;
+        }
+
+        private void loadNewBucket() {
+            for (int i = 0; i < Short.MAX_VALUE; i++) {
+                if (i % numAssigners == assignId && 
!bucketInformation.containsKey(i)) {
+                    currentBucket = i;
+                    return;
+                }
+            }
+            throw new RuntimeException(
+                    "Can't find a suitable bucket to assign, all the bucket 
are assigned?");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index ad2fbb720..125756a01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -330,7 +330,8 @@ public abstract class AbstractFileStoreWrite<T>
         IndexMaintainer<T> indexMaintainer =
                 indexFactory == null
                         ? null
-                        : indexFactory.createOrRestore(latestSnapshotId, 
partition, bucket);
+                        : indexFactory.createOrRestore(
+                                ignorePreviousFiles ? null : latestSnapshotId, 
partition, bucket);
         RecordWriter<T> writer =
                 createWriter(partition.copy(), bucket, restoreFiles, null, 
compactExecutor());
         notifyNewWriter(writer);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
new file mode 100644
index 000000000..d795b8926
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link SimpleHashBucketAssigner}. */
+public class SimpleHashBucketAssignerTest {
+
+    @Test
+    public void testAssign() {
+        SimpleHashBucketAssigner simpleHashBucketAssigner = new 
SimpleHashBucketAssigner(2, 0, 100);
+
+        BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+        int hash = 0;
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(0);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(2);
+        }
+
+        int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+        Assertions.assertThat(bucket).isEqualTo(4);
+    }
+
+    @Test
+    public void testAssignWithSameHash() {
+        SimpleHashBucketAssigner simpleHashBucketAssigner = new 
SimpleHashBucketAssigner(2, 0, 100);
+
+        BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+        int hash = 0;
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(0);
+        }
+
+        // reset hash, the record will go into bucket 0
+        hash = 0;
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(0);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
new file mode 100644
index 000000000..d8f5e11a8
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchWriteBuilderImpl;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** Tests for Dynamic Bucket Table. */
+public class DynamicBucketTableTest extends TableTestBase {
+
+    @Test
+    public void testOverwriteDynamicBucketTable() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(100, 100));
+
+        Table table = getTableDefault();
+        BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl) 
table.newBatchWriteBuilder();
+        TableWriteImpl batchTableWrite = (TableWriteImpl) 
builder.withOverwrite().newWrite();
+        HashIndexMaintainer indexMaintainer =
+                (HashIndexMaintainer)
+                        batchTableWrite
+                                .getWrite()
+                                .createWriterContainer(BinaryRow.EMPTY_ROW, 0, 
true)
+                                .indexMaintainer;
+
+        Assertions.assertThat(indexMaintainer.isEmpty()).isTrue();
+        batchTableWrite.write(data(0));
+        Assertions.assertThat(
+                        ((CommitMessageImpl) 
batchTableWrite.prepareCommit().get(0))
+                                .indexIncrement()
+                                .newIndexFiles()
+                                .get(0)
+                                .rowCount())
+                .isEqualTo(1);
+    }
+
+    protected List<CommitMessage> writeDataDefault(int size, int times) throws 
Exception {
+        List<CommitMessage> messages;
+        Table table = getTableDefault();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < times; i++) {
+                for (int j = 0; j < size; j++) {
+                    batchTableWrite.write(data(i));
+                }
+            }
+            messages = batchTableWrite.prepareCommit();
+        }
+
+        return messages;
+    }
+
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.BIGINT());
+        schemaBuilder.column("f1", DataTypes.BIGINT());
+        schemaBuilder.column("f2", DataTypes.BIGINT());
+        schemaBuilder.column("f3", DataTypes.BIGINT());
+        schemaBuilder.option("bucket", "-1");
+        schemaBuilder.option("scan.parallelism", "6");
+        schemaBuilder.option("sink.parallelism", "3");
+        schemaBuilder.option("dynamic-bucket.target-row-num", "100");
+        schemaBuilder.primaryKey("f0");
+        return schemaBuilder.build();
+    }
+
+    private static InternalRow data(int bucket) {
+        GenericRow row =
+                GenericRow.of(
+                        RANDOM.nextLong(),
+                        (long) RANDOM.nextInt(10000),
+                        (long) RANDOM.nextInt(10000),
+                        (long) RANDOM.nextInt(10000));
+        return new DynamicBucketRow(row, bucket);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 322d0c8ad..8bd03ca7f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -180,7 +180,6 @@ public abstract class TableTestBase {
         }
     }
 
-    // schema with all the basic types.
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index e8d54d8b2..b665714d0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 
@@ -45,8 +44,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** Compact with sort action. */
 public class SortCompactAction extends CompactAction {
 
@@ -63,9 +60,6 @@ public class SortCompactAction extends CompactAction {
             Map<String, String> tableConf) {
         super(warehouse, database, tableName, catalogConfig, tableConf);
 
-        checkArgument(
-                table instanceof AppendOnlyFileStoreTable,
-                "Only sort compaction works with append-only table for now.");
         table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
     }
 
@@ -85,10 +79,9 @@ public class SortCompactAction extends CompactAction {
             env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         }
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
-            throw new IllegalArgumentException("Sort Compact only supports 
append-only table yet");
-        }
-        if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
+
+        if (fileStoreTable.bucketMode() != BucketMode.UNAWARE
+                && fileStoreTable.bucketMode() != BucketMode.DYNAMIC) {
             throw new IllegalArgumentException("Sort Compact only supports 
bucket=-1 yet.");
         }
         Map<String, String> tableConfig = fileStoreTable.options();
@@ -120,8 +113,7 @@ public class SortCompactAction extends CompactAction {
 
         new FlinkSinkBuilder(fileStoreTable)
                 .withInput(sorter.sort())
-                // This should use empty map to tag it on overwrite action, 
otherwise there is no
-                // overwrite action.
+                .forCompact(true)
                 .withOverwritePartition(new HashMap<>())
                 .build();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
new file mode 100644
index 000000000..11e21eed6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.UUID;
+
+/** This class is only used for generate compact sink topology for dynamic 
bucket table. */
+public class DynamicBucketCompactSink extends RowDynamicBucketSink {
+
+    public DynamicBucketCompactSink(
+            FileStoreTable table, @Nullable Map<String, String> 
overwritePartition) {
+        super(table, overwritePartition);
+    }
+
+    @Override
+    public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable 
Integer parallelism) {
+        String initialCommitUser = UUID.randomUUID().toString();
+
+        // This input is sorted and compacted. So there is no shuffle here, we 
just assign bucket
+        // for each record, and sink them to table.
+
+        // bucket-assigner
+        HashBucketAssignerOperator<InternalRow> assignerOperator =
+                new HashBucketAssignerOperator<>(
+                        initialCommitUser, table, extractorFunction(), true);
+        TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
+                new TupleTypeInfo<>(input.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
+        DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
+                input.transform("dynamic-bucket-assigner", rowWithBucketType, 
assignerOperator)
+                        .setParallelism(input.getParallelism());
+        return sinkFrom(bucketAssigned, initialCommitUser);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index 31a455f74..5efa87356 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -57,7 +57,8 @@ public abstract class DynamicBucketSink<T> extends 
FlinkWriteSink<Tuple2<T, Inte
         String initialCommitUser = UUID.randomUUID().toString();
 
         // Topology:
-        // input -- shuffle by key hash --> bucket-assigner -- shuffle by 
bucket --> writer -->
+        // input -- shuffle by key hash --> bucket-assigner -- shuffle by 
partition & bucket -->
+        // writer -->
         // committer
 
         // 1. shuffle by key hash
@@ -70,7 +71,8 @@ public abstract class DynamicBucketSink<T> extends 
FlinkWriteSink<Tuple2<T, Inte
 
         // 2. bucket-assigner
         HashBucketAssignerOperator<T> assignerOperator =
-                new HashBucketAssignerOperator<>(initialCommitUser, table, 
extractorFunction());
+                new HashBucketAssignerOperator<>(
+                        initialCommitUser, table, extractorFunction(), false);
         TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
                 new TupleTypeInfo<>(partitionByKeyHash.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
         DataStream<Tuple2<T, Integer>> bucketAssigned =
@@ -78,7 +80,7 @@ public abstract class DynamicBucketSink<T> extends 
FlinkWriteSink<Tuple2<T, Inte
                         .transform("dynamic-bucket-assigner", 
rowWithBucketType, assignerOperator)
                         .setParallelism(partitionByKeyHash.getParallelism());
 
-        // 3. shuffle by bucket
+        // 3. shuffle by partition & bucket
         DataStream<Tuple2<T, Integer>> partitionByBucket =
                 partition(bucketAssigned, channelComputer2(), parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 2e3a783f0..56ead8a85 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -44,6 +44,7 @@ public class FlinkSinkBuilder {
     @Nullable private Map<String, String> overwritePartition;
     @Nullable private LogSinkFunction logSinkFunction;
     @Nullable private Integer parallelism;
+    private boolean compactSink = false;
 
     public FlinkSinkBuilder(FileStoreTable table) {
         this.table = table;
@@ -78,6 +79,11 @@ public class FlinkSinkBuilder {
         return this;
     }
 
+    public FlinkSinkBuilder forCompact(boolean compactSink) {
+        this.compactSink = compactSink;
+        return this;
+    }
+
     public DataStreamSink<?> build() {
         DataStream<InternalRow> input = MapToInternalRow.map(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
@@ -108,9 +114,14 @@ public class FlinkSinkBuilder {
     private DataStreamSink<?> buildDynamicBucketSink(
             DataStream<InternalRow> input, boolean globalIndex) {
         checkArgument(logSinkFunction == null, "Dynamic bucket mode can not 
work with log system.");
-        return globalIndex
-                ? new GlobalDynamicBucketSink(table, 
overwritePartition).build(input, parallelism)
-                : new RowDynamicBucketSink(table, 
overwritePartition).build(input, parallelism);
+        return compactSink && !globalIndex
+                // todo support global index sort compact
+                ? new DynamicBucketCompactSink(table, 
overwritePartition).build(input, parallelism)
+                : globalIndex
+                        ? new GlobalDynamicBucketSink(table, 
overwritePartition)
+                                .build(input, parallelism)
+                        : new RowDynamicBucketSink(table, overwritePartition)
+                                .build(input, parallelism);
     }
 
     private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> 
input) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 2d256d9bf..eb7934418 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.index.BucketAssigner;
 import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.index.SimpleHashBucketAssigner;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
@@ -41,17 +43,20 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
 
     private final AbstractFileStoreTable table;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
+    private final boolean overwrite;
 
-    private transient HashBucketAssigner assigner;
+    private transient BucketAssigner assigner;
     private transient PartitionKeyExtractor<T> extractor;
 
     public HashBucketAssignerOperator(
             String commitUser,
             Table table,
-            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction) {
+            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction,
+            boolean overwrite) {
         this.initialCommitUser = commitUser;
         this.table = (AbstractFileStoreTable) table;
         this.extractorFunction = extractorFunction;
+        this.overwrite = overwrite;
     }
 
     @Override
@@ -66,13 +71,18 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
         this.assigner =
-                new HashBucketAssigner(
-                        table.snapshotManager(),
-                        commitUser,
-                        table.store().newIndexFileHandler(),
-                        getRuntimeContext().getNumberOfParallelSubtasks(),
-                        getRuntimeContext().getIndexOfThisSubtask(),
-                        table.coreOptions().dynamicBucketTargetRowNum());
+                overwrite
+                        ? new SimpleHashBucketAssigner(
+                                
getRuntimeContext().getNumberOfParallelSubtasks(),
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                
table.coreOptions().dynamicBucketTargetRowNum())
+                        : new HashBucketAssigner(
+                                table.snapshotManager(),
+                                commitUser,
+                                table.store().newIndexFileHandler(),
+                                
getRuntimeContext().getNumberOfParallelSubtasks(),
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                
table.coreOptions().dynamicBucketTargetRowNum());
         this.extractor = extractorFunction.apply(table.schema());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index 8e9ce02e6..85e71b731 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -18,12 +18,24 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.AbstractFileStoreTable;
+
 import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for batch file store. */
@@ -74,4 +86,32 @@ public class DynamicBucketTableITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
                 .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
     }
+
+    @Test
+    public void testOverwrite() throws Exception {
+        sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1, 1),
+                        Row.of(1, 2, 2),
+                        Row.of(1, 3, 3),
+                        Row.of(1, 4, 4),
+                        Row.of(1, 5, 5));
+
+        // overwrite the whole table, we should update the index file by this 
sql
+        sql("INSERT OVERWRITE T SELECT * FROM T LIMIT 4");
+
+        AbstractFileStoreTable table =
+                (AbstractFileStoreTable)
+                        
(CatalogFactory.createCatalog(CatalogContext.create(new Path(path))))
+                                .getTable(Identifier.create("default", "T"));
+        IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
+        List<BinaryRow> partitions = table.newScan().listPartitions();
+        List<IndexManifestEntry> entries = new ArrayList<>();
+        partitions.forEach(p -> 
entries.addAll(indexFileHandler.scan(HASH_INDEX, p)));
+
+        Long records =
+                entries.stream().map(entry -> 
entry.indexFile().rowCount()).reduce(Long::sum).get();
+        Assertions.assertThat(records).isEqualTo(4);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
new file mode 100644
index 000000000..4d219e0a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/** Sort Compact Action tests for dynamic bucket table. */
+public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testDynamicBucketSort() throws Exception {
+        createTable();
+
+        commit(writeData(100));
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 100L, 200L);
+
+        List<ManifestEntry> files = ((FileStoreTable) 
getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilter =
+                ((ChangelogWithKeyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+
+        zorder(Arrays.asList("f2", "f1"));
+
+        List<ManifestEntry> filesZorder =
+                ((FileStoreTable) getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilterZorder =
+                ((ChangelogWithKeyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(filesFilterZorder.size() / (double) 
filesZorder.size())
+                .isLessThan(filesFilter.size() / (double) files.size());
+    }
+
+    @Test
+    public void testDynamicBucketSortWithOrderAndZorder() throws Exception {
+        createTable();
+
+        commit(writeData(100));
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 100L, 200L);
+
+        // order f2,f1 will make predicate of f1 perform worse.
+        order(Arrays.asList("f2", "f1"));
+        List<ManifestEntry> files = ((FileStoreTable) 
getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilter =
+                ((ChangelogWithKeyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+
+        zorder(Arrays.asList("f2", "f1"));
+
+        List<ManifestEntry> filesZorder =
+                ((FileStoreTable) getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilterZorder =
+                ((ChangelogWithKeyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+
+        Assertions.assertThat(filesFilterZorder.size() / (double) 
filesZorder.size())
+                .isLessThan(filesFilter.size() / (double) files.size());
+    }
+
+    private void zorder(List<String> columns) throws Exception {
+        if (RANDOM.nextBoolean()) {
+            new SortCompactAction(
+                            warehouse,
+                            database,
+                            tableName,
+                            Collections.emptyMap(),
+                            Collections.emptyMap())
+                    .withOrderStrategy("zorder")
+                    .withOrderColumns(columns)
+                    .run();
+        } else {
+            callProcedure("zorder", columns);
+        }
+    }
+
+    private void order(List<String> columns) throws Exception {
+        if (RANDOM.nextBoolean()) {
+            new SortCompactAction(
+                            warehouse,
+                            database,
+                            tableName,
+                            Collections.emptyMap(),
+                            Collections.emptyMap())
+                    .withOrderStrategy("order")
+                    .withOrderColumns(columns)
+                    .run();
+        } else {
+            callProcedure("order", columns);
+        }
+    }
+
+    private void callProcedure(String orderStrategy, List<String> 
orderByColumns) {
+        callProcedure(
+                String.format(
+                        "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+                        database, tableName, orderStrategy, String.join(",", 
orderByColumns)),
+                false,
+                true);
+    }
+
+    // schema with all the basic types.
+    private static Schema schema() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.BIGINT());
+        schemaBuilder.column("f1", DataTypes.BIGINT());
+        schemaBuilder.column("f2", DataTypes.BIGINT());
+        schemaBuilder.column("f3", DataTypes.BIGINT());
+        schemaBuilder.option("bucket", "-1");
+        schemaBuilder.option("scan.parallelism", "6");
+        schemaBuilder.option("sink.parallelism", "3");
+        schemaBuilder.option("dynamic-bucket.target-row-num", "100");
+        schemaBuilder.primaryKey("f0");
+        return schemaBuilder.build();
+    }
+
+    private List<CommitMessage> writeData(int size) throws Exception {
+        List<CommitMessage> messages;
+        Table table = getTable();
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < size; i++) {
+                for (int j = 0; j < 100; j++) {
+                    batchTableWrite.write(data(i));
+                }
+            }
+            messages = batchTableWrite.prepareCommit();
+        }
+
+        return messages;
+    }
+
+    private void commit(List<CommitMessage> messages) throws Exception {
+        getTable().newBatchWriteBuilder().newCommit().commit(messages);
+    }
+
+    private void createTable() throws Exception {
+        catalog.createDatabase(database, true);
+        catalog.createTable(identifier(), schema(), true);
+    }
+
+    private Table getTable() throws Exception {
+        return catalog.getTable(identifier());
+    }
+
+    private Identifier identifier() {
+        return Identifier.create(database, tableName);
+    }
+
+    private static InternalRow data(int bucket) {
+        GenericRow row =
+                GenericRow.of(
+                        RANDOM.nextLong(),
+                        (long) RANDOM.nextInt(10000),
+                        (long) RANDOM.nextInt(10000),
+                        (long) RANDOM.nextInt(10000));
+        return new DynamicBucketRow(row, bucket);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 152ca35e9..00bdbecae 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -48,9 +48,9 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Order Rewrite Action tests for {@link SortCompactAction}. */
-public class SortCompactActionITCase extends ActionITCaseBase {
+public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
 
-    private static final Random random = new Random();
+    private static final Random RANDOM = new Random();
 
     private void prepareData(int size, int loop) throws Exception {
         createTable();
@@ -230,7 +230,7 @@ public class SortCompactActionITCase extends 
ActionITCaseBase {
     }
 
     private void zorder(List<String> columns) throws Exception {
-        if (random.nextBoolean()) {
+        if (RANDOM.nextBoolean()) {
             new SortCompactAction(
                             warehouse,
                             database,
@@ -246,7 +246,7 @@ public class SortCompactActionITCase extends 
ActionITCaseBase {
     }
 
     private void order(List<String> columns) throws Exception {
-        if (random.nextBoolean()) {
+        if (RANDOM.nextBoolean()) {
             new SortCompactAction(
                             warehouse,
                             database,
@@ -270,12 +270,12 @@ public class SortCompactActionITCase extends 
ActionITCaseBase {
                 true);
     }
 
-    public void createTable() throws Exception {
+    private void createTable() throws Exception {
         catalog.createDatabase(database, true);
         catalog.createTable(identifier(), schema(), true);
     }
 
-    public Identifier identifier() {
+    private Identifier identifier() {
         return Identifier.create(database, tableName);
     }
 
@@ -319,7 +319,7 @@ public class SortCompactActionITCase extends 
ActionITCaseBase {
         return messages;
     }
 
-    public Table getTable() throws Exception {
+    private Table getTable() throws Exception {
         return catalog.getTable(identifier());
     }
 
@@ -356,8 +356,8 @@ public class SortCompactActionITCase extends 
ActionITCaseBase {
     }
 
     private static byte[] randomBytes() {
-        byte[] binary = new byte[random.nextInt(10)];
-        random.nextBytes(binary);
+        byte[] binary = new byte[RANDOM.nextInt(10)];
+        RANDOM.nextBytes(binary);
         return binary;
     }
 }


Reply via email to