This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cef2fc18d [flink] Introduce zorder/order sort compact for dynamic
bucket table (#2100)
cef2fc18d is described below
commit cef2fc18d3ceab949ee05ee56422a6bc0781190c
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 10 11:03:36 2023 +0800
[flink] Introduce zorder/order sort compact for dynamic bucket table (#2100)
---
.../{IndexMaintainer.java => BucketAssigner.java} | 15 +-
.../apache/paimon/index/HashBucketAssigner.java | 4 +-
.../apache/paimon/index/HashIndexMaintainer.java | 15 +-
.../org/apache/paimon/index/IndexMaintainer.java | 5 +-
.../paimon/index/SimpleHashBucketAssigner.java | 93 +++++++++
.../paimon/operation/AbstractFileStoreWrite.java | 3 +-
.../paimon/index/SimpleHashBucketAssignerTest.java | 69 +++++++
.../paimon/table/DynamicBucketTableTest.java | 109 +++++++++++
.../org/apache/paimon/table/TableTestBase.java | 1 -
.../paimon/flink/action/SortCompactAction.java | 16 +-
.../flink/sink/DynamicBucketCompactSink.java | 61 ++++++
.../paimon/flink/sink/DynamicBucketSink.java | 8 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 17 +-
.../flink/sink/HashBucketAssignerOperator.java | 28 ++-
.../paimon/flink/DynamicBucketTableITCase.java | 40 ++++
.../SortCompactActionForDynamicBucketITCase.java | 215 +++++++++++++++++++++
...> SortCompactActionForUnawareBucketITCase.java} | 18 +-
17 files changed, 664 insertions(+), 53 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
similarity index 71%
copy from paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
copy to paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
index 8d61d46f8..f87d6a16f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java
@@ -20,17 +20,10 @@ package org.apache.paimon.index;
import org.apache.paimon.data.BinaryRow;
-import java.util.List;
+/** Assigner a bucket for a record, just used in dynamic bucket table. */
+public interface BucketAssigner {
-/** Maintainer to maintain index. */
-public interface IndexMaintainer<T> {
+ int assign(BinaryRow partition, int hash);
- void notifyNewRecord(T record);
-
- List<IndexFileMeta> prepareCommit();
-
- /** Factory to restore {@link IndexMaintainer}. */
- interface Factory<T> {
- IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow
partition, int bucket);
- }
+ void prepareCommit(long commitIdentifier);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 329d3b971..519a495b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -34,7 +34,7 @@ import java.util.Set;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Assign bucket for key hashcode. */
-public class HashBucketAssigner {
+public class HashBucketAssigner implements BucketAssigner {
private static final Logger LOG =
LoggerFactory.getLogger(HashBucketAssigner.class);
@@ -64,6 +64,7 @@ public class HashBucketAssigner {
}
/** Assign a bucket for key hash of a record. */
+ @Override
public int assign(BinaryRow partition, int hash) {
int recordAssignId = computeAssignId(hash);
checkArgument(
@@ -88,6 +89,7 @@ public class HashBucketAssigner {
}
/** Prepare commit to clear outdated partition index. */
+ @Override
public void prepareCommit(long commitIdentifier) {
long latestCommittedIdentifier;
if (partitionIndex.values().stream()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
index 998c72399..66a8d6409 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -19,11 +19,14 @@
package org.apache.paimon.index;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.IntHashSet;
import org.apache.paimon.utils.IntIterator;
+import javax.annotation.Nullable;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -40,7 +43,10 @@ public class HashIndexMaintainer implements
IndexMaintainer<KeyValue> {
private boolean modified;
private HashIndexMaintainer(
- IndexFileHandler fileHandler, Long snapshotId, BinaryRow
partition, int bucket) {
+ IndexFileHandler fileHandler,
+ @Nullable Long snapshotId,
+ BinaryRow partition,
+ int bucket) {
this.fileHandler = fileHandler;
IntHashSet hashcode = new IntHashSet();
if (snapshotId != null) {
@@ -93,6 +99,11 @@ public class HashIndexMaintainer implements
IndexMaintainer<KeyValue> {
return Collections.emptyList();
}
+ @VisibleForTesting
+ public boolean isEmpty() {
+ return hashcode.size() == 0;
+ }
+
/** Factory to restore {@link HashIndexMaintainer}. */
public static class Factory implements IndexMaintainer.Factory<KeyValue> {
@@ -104,7 +115,7 @@ public class HashIndexMaintainer implements
IndexMaintainer<KeyValue> {
@Override
public IndexMaintainer<KeyValue> createOrRestore(
- Long snapshotId, BinaryRow partition, int bucket) {
+ @Nullable Long snapshotId, BinaryRow partition, int bucket) {
return new HashIndexMaintainer(handler, snapshotId, partition,
bucket);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
index 8d61d46f8..ba881d614 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java
@@ -20,6 +20,8 @@ package org.apache.paimon.index;
import org.apache.paimon.data.BinaryRow;
+import javax.annotation.Nullable;
+
import java.util.List;
/** Maintainer to maintain index. */
@@ -31,6 +33,7 @@ public interface IndexMaintainer<T> {
/** Factory to restore {@link IndexMaintainer}. */
interface Factory<T> {
- IndexMaintainer<T> createOrRestore(Long snapshotId, BinaryRow
partition, int bucket);
+ IndexMaintainer<T> createOrRestore(
+ @Nullable Long snapshotId, BinaryRow partition, int bucket);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
new file mode 100644
index 000000000..7094684fc
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.utils.Int2ShortHashMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** When we need to overwrite the table, we should use this to avoid loading
index. */
+public class SimpleHashBucketAssigner implements BucketAssigner {
+
+ private final int numAssigners;
+ private final int assignId;
+ private final long targetBucketRowNumber;
+
+ private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;
+
+ public SimpleHashBucketAssigner(int numAssigners, int assignId, long
targetBucketRowNumber) {
+ this.numAssigners = numAssigners;
+ this.assignId = assignId;
+ this.targetBucketRowNumber = targetBucketRowNumber;
+ this.partitionIndex = new HashMap<>();
+ }
+
+ @Override
+ public int assign(BinaryRow partition, int hash) {
+ SimplePartitionIndex index =
+ this.partitionIndex.computeIfAbsent(partition, p -> new
SimplePartitionIndex());
+ return index.assign(hash);
+ }
+
+ @Override
+ public void prepareCommit(long commitIdentifier) {
+ // do nothing
+ }
+
+ /** Simple partition bucket hash assigner. */
+ private class SimplePartitionIndex {
+
+ public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap();
+ private final Map<Integer, Long> bucketInformation;
+ private int currentBucket;
+
+ private SimplePartitionIndex() {
+ bucketInformation = new HashMap<>();
+ loadNewBucket();
+ }
+
+ public int assign(int hash) {
+ // the same hash should go into the same bucket
+ if (hash2Bucket.containsKey(hash)) {
+ return hash2Bucket.get(hash);
+ }
+
+ Long num = bucketInformation.computeIfAbsent(currentBucket, i ->
0L);
+ if (num >= targetBucketRowNumber) {
+ loadNewBucket();
+ }
+ bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L
: l + 1);
+ hash2Bucket.put(hash, (short) currentBucket);
+ return currentBucket;
+ }
+
+ private void loadNewBucket() {
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ if (i % numAssigners == assignId &&
!bucketInformation.containsKey(i)) {
+ currentBucket = i;
+ return;
+ }
+ }
+ throw new RuntimeException(
+ "Can't find a suitable bucket to assign, all the bucket
are assigned?");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index ad2fbb720..125756a01 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -330,7 +330,8 @@ public abstract class AbstractFileStoreWrite<T>
IndexMaintainer<T> indexMaintainer =
indexFactory == null
? null
- : indexFactory.createOrRestore(latestSnapshotId,
partition, bucket);
+ : indexFactory.createOrRestore(
+ ignorePreviousFiles ? null : latestSnapshotId,
partition, bucket);
RecordWriter<T> writer =
createWriter(partition.copy(), bucket, restoreFiles, null,
compactExecutor());
notifyNewWriter(writer);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
new file mode 100644
index 000000000..d795b8926
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryRow;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link SimpleHashBucketAssigner}. */
+public class SimpleHashBucketAssignerTest {
+
+ @Test
+ public void testAssign() {
+ SimpleHashBucketAssigner simpleHashBucketAssigner = new
SimpleHashBucketAssigner(2, 0, 100);
+
+ BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+ int hash = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(0);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(2);
+ }
+
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(4);
+ }
+
+ @Test
+ public void testAssignWithSameHash() {
+ SimpleHashBucketAssigner simpleHashBucketAssigner = new
SimpleHashBucketAssigner(2, 0, 100);
+
+ BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+ int hash = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(0);
+ }
+
+ // reset hash, the record will go into bucket 0
+ hash = 0;
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(0);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
new file mode 100644
index 000000000..d8f5e11a8
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.HashIndexMaintainer;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchWriteBuilderImpl;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/** Tests for Dynamic Bucket Table. */
+public class DynamicBucketTableTest extends TableTestBase {
+
+ @Test
+ public void testOverwriteDynamicBucketTable() throws Exception {
+ createTableDefault();
+
+ commitDefault(writeDataDefault(100, 100));
+
+ Table table = getTableDefault();
+ BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl)
table.newBatchWriteBuilder();
+ TableWriteImpl batchTableWrite = (TableWriteImpl)
builder.withOverwrite().newWrite();
+ HashIndexMaintainer indexMaintainer =
+ (HashIndexMaintainer)
+ batchTableWrite
+ .getWrite()
+ .createWriterContainer(BinaryRow.EMPTY_ROW, 0,
true)
+ .indexMaintainer;
+
+ Assertions.assertThat(indexMaintainer.isEmpty()).isTrue();
+ batchTableWrite.write(data(0));
+ Assertions.assertThat(
+ ((CommitMessageImpl)
batchTableWrite.prepareCommit().get(0))
+ .indexIncrement()
+ .newIndexFiles()
+ .get(0)
+ .rowCount())
+ .isEqualTo(1);
+ }
+
+ protected List<CommitMessage> writeDataDefault(int size, int times) throws
Exception {
+ List<CommitMessage> messages;
+ Table table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < times; i++) {
+ for (int j = 0; j < size; j++) {
+ batchTableWrite.write(data(i));
+ }
+ }
+ messages = batchTableWrite.prepareCommit();
+ }
+
+ return messages;
+ }
+
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.BIGINT());
+ schemaBuilder.column("f1", DataTypes.BIGINT());
+ schemaBuilder.column("f2", DataTypes.BIGINT());
+ schemaBuilder.column("f3", DataTypes.BIGINT());
+ schemaBuilder.option("bucket", "-1");
+ schemaBuilder.option("scan.parallelism", "6");
+ schemaBuilder.option("sink.parallelism", "3");
+ schemaBuilder.option("dynamic-bucket.target-row-num", "100");
+ schemaBuilder.primaryKey("f0");
+ return schemaBuilder.build();
+ }
+
+ private static InternalRow data(int bucket) {
+ GenericRow row =
+ GenericRow.of(
+ RANDOM.nextLong(),
+ (long) RANDOM.nextInt(10000),
+ (long) RANDOM.nextInt(10000),
+ (long) RANDOM.nextInt(10000));
+ return new DynamicBucketRow(row, bucket);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 322d0c8ad..8bd03ca7f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -180,7 +180,6 @@ public abstract class TableTestBase {
}
}
- // schema with all the basic types.
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index e8d54d8b2..b665714d0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
@@ -45,8 +44,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Compact with sort action. */
public class SortCompactAction extends CompactAction {
@@ -63,9 +60,6 @@ public class SortCompactAction extends CompactAction {
Map<String, String> tableConf) {
super(warehouse, database, tableName, catalogConfig, tableConf);
- checkArgument(
- table instanceof AppendOnlyFileStoreTable,
- "Only sort compaction works with append-only table for now.");
table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
}
@@ -85,10 +79,9 @@ public class SortCompactAction extends CompactAction {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
FileStoreTable fileStoreTable = (FileStoreTable) table;
- if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
- throw new IllegalArgumentException("Sort Compact only supports
append-only table yet");
- }
- if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
+
+ if (fileStoreTable.bucketMode() != BucketMode.UNAWARE
+ && fileStoreTable.bucketMode() != BucketMode.DYNAMIC) {
throw new IllegalArgumentException("Sort Compact only supports
bucket=-1 yet.");
}
Map<String, String> tableConfig = fileStoreTable.options();
@@ -120,8 +113,7 @@ public class SortCompactAction extends CompactAction {
new FlinkSinkBuilder(fileStoreTable)
.withInput(sorter.sort())
- // This should use empty map to tag it on overwrite action,
otherwise there is no
- // overwrite action.
+ .forCompact(true)
.withOverwritePartition(new HashMap<>())
.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
new file mode 100644
index 000000000..11e21eed6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.UUID;
+
+/** This class is only used for generate compact sink topology for dynamic
bucket table. */
+public class DynamicBucketCompactSink extends RowDynamicBucketSink {
+
+ public DynamicBucketCompactSink(
+ FileStoreTable table, @Nullable Map<String, String>
overwritePartition) {
+ super(table, overwritePartition);
+ }
+
+ @Override
+ public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable
Integer parallelism) {
+ String initialCommitUser = UUID.randomUUID().toString();
+
+ // This input is sorted and compacted. So there is no shuffle here, we
just assign bucket
+ // for each record, and sink them to table.
+
+ // bucket-assigner
+ HashBucketAssignerOperator<InternalRow> assignerOperator =
+ new HashBucketAssignerOperator<>(
+ initialCommitUser, table, extractorFunction(), true);
+ TupleTypeInfo<Tuple2<InternalRow, Integer>> rowWithBucketType =
+ new TupleTypeInfo<>(input.getType(),
BasicTypeInfo.INT_TYPE_INFO);
+ DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
+ input.transform("dynamic-bucket-assigner", rowWithBucketType,
assignerOperator)
+ .setParallelism(input.getParallelism());
+ return sinkFrom(bucketAssigned, initialCommitUser);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index 31a455f74..5efa87356 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -57,7 +57,8 @@ public abstract class DynamicBucketSink<T> extends
FlinkWriteSink<Tuple2<T, Inte
String initialCommitUser = UUID.randomUUID().toString();
// Topology:
- // input -- shuffle by key hash --> bucket-assigner -- shuffle by
bucket --> writer -->
+ // input -- shuffle by key hash --> bucket-assigner -- shuffle by
partition & bucket -->
+ // writer -->
// committer
// 1. shuffle by key hash
@@ -70,7 +71,8 @@ public abstract class DynamicBucketSink<T> extends
FlinkWriteSink<Tuple2<T, Inte
// 2. bucket-assigner
HashBucketAssignerOperator<T> assignerOperator =
- new HashBucketAssignerOperator<>(initialCommitUser, table,
extractorFunction());
+ new HashBucketAssignerOperator<>(
+ initialCommitUser, table, extractorFunction(), false);
TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(),
BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<T, Integer>> bucketAssigned =
@@ -78,7 +80,7 @@ public abstract class DynamicBucketSink<T> extends
FlinkWriteSink<Tuple2<T, Inte
.transform("dynamic-bucket-assigner",
rowWithBucketType, assignerOperator)
.setParallelism(partitionByKeyHash.getParallelism());
- // 3. shuffle by bucket
+ // 3. shuffle by partition & bucket
DataStream<Tuple2<T, Integer>> partitionByBucket =
partition(bucketAssigned, channelComputer2(), parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 2e3a783f0..56ead8a85 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -44,6 +44,7 @@ public class FlinkSinkBuilder {
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
+ private boolean compactSink = false;
public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
@@ -78,6 +79,11 @@ public class FlinkSinkBuilder {
return this;
}
+ public FlinkSinkBuilder forCompact(boolean compactSink) {
+ this.compactSink = compactSink;
+ return this;
+ }
+
public DataStreamSink<?> build() {
DataStream<InternalRow> input = MapToInternalRow.map(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
@@ -108,9 +114,14 @@ public class FlinkSinkBuilder {
private DataStreamSink<?> buildDynamicBucketSink(
DataStream<InternalRow> input, boolean globalIndex) {
checkArgument(logSinkFunction == null, "Dynamic bucket mode can not
work with log system.");
- return globalIndex
- ? new GlobalDynamicBucketSink(table,
overwritePartition).build(input, parallelism)
- : new RowDynamicBucketSink(table,
overwritePartition).build(input, parallelism);
+ return compactSink && !globalIndex
+ // todo support global index sort compact
+ ? new DynamicBucketCompactSink(table,
overwritePartition).build(input, parallelism)
+ : globalIndex
+ ? new GlobalDynamicBucketSink(table,
overwritePartition)
+ .build(input, parallelism)
+ : new RowDynamicBucketSink(table, overwritePartition)
+ .build(input, parallelism);
}
private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow>
input) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 2d256d9bf..eb7934418 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -18,7 +18,9 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.index.SimpleHashBucketAssigner;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
@@ -41,17 +43,20 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
private final AbstractFileStoreTable table;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
+ private final boolean overwrite;
- private transient HashBucketAssigner assigner;
+ private transient BucketAssigner assigner;
private transient PartitionKeyExtractor<T> extractor;
public HashBucketAssignerOperator(
String commitUser,
Table table,
- SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction) {
+ SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction,
+ boolean overwrite) {
this.initialCommitUser = commitUser;
this.table = (AbstractFileStoreTable) table;
this.extractorFunction = extractorFunction;
+ this.overwrite = overwrite;
}
@Override
@@ -66,13 +71,18 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
context, "commit_user_state", String.class,
initialCommitUser);
this.assigner =
- new HashBucketAssigner(
- table.snapshotManager(),
- commitUser,
- table.store().newIndexFileHandler(),
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask(),
- table.coreOptions().dynamicBucketTargetRowNum());
+ overwrite
+ ? new SimpleHashBucketAssigner(
+
getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+
table.coreOptions().dynamicBucketTargetRowNum())
+ : new HashBucketAssigner(
+ table.snapshotManager(),
+ commitUser,
+ table.store().newIndexFileHandler(),
+
getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+
table.coreOptions().dynamicBucketTargetRowNum());
this.extractor = extractorFunction.apply(table.schema());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index 8e9ce02e6..85e71b731 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -18,12 +18,24 @@
package org.apache.paimon.flink;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.AbstractFileStoreTable;
+
import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for batch file store. */
@@ -74,4 +86,32 @@ public class DynamicBucketTableITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
.containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
}
+
+ @Test
+ public void testOverwrite() throws Exception {
+ sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4),
(1, 5, 5)");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 1, 1),
+ Row.of(1, 2, 2),
+ Row.of(1, 3, 3),
+ Row.of(1, 4, 4),
+ Row.of(1, 5, 5));
+
+ // overwrite the whole table, we should update the index file by this
sql
+ sql("INSERT OVERWRITE T SELECT * FROM T LIMIT 4");
+
+ AbstractFileStoreTable table =
+ (AbstractFileStoreTable)
+
(CatalogFactory.createCatalog(CatalogContext.create(new Path(path))))
+ .getTable(Identifier.create("default", "T"));
+ IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
+ List<BinaryRow> partitions = table.newScan().listPartitions();
+ List<IndexManifestEntry> entries = new ArrayList<>();
+ partitions.forEach(p ->
entries.addAll(indexFileHandler.scan(HASH_INDEX, p)));
+
+ Long records =
+ entries.stream().map(entry ->
entry.indexFile().rowCount()).reduce(Long::sum).get();
+ Assertions.assertThat(records).isEqualTo(4);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
new file mode 100644
index 000000000..4d219e0a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.DynamicBucketRow;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/** Sort Compact Action tests for dynamic bucket table. */
+public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testDynamicBucketSort() throws Exception {
+ createTable();
+
+ commit(writeData(100));
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
+ Predicate predicate = predicateBuilder.between(1, 100L, 200L);
+
+ List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilter =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+
+ zorder(Arrays.asList("f2", "f1"));
+
+ List<ManifestEntry> filesZorder =
+ ((FileStoreTable) getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilterZorder =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(filesFilterZorder.size() / (double)
filesZorder.size())
+ .isLessThan(filesFilter.size() / (double) files.size());
+ }
+
+ @Test
+ public void testDynamicBucketSortWithOrderAndZorder() throws Exception {
+ createTable();
+
+ commit(writeData(100));
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
+ Predicate predicate = predicateBuilder.between(1, 100L, 200L);
+
+ // order f2,f1 will make predicate of f1 perform worse.
+ order(Arrays.asList("f2", "f1"));
+ List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilter =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+
+ zorder(Arrays.asList("f2", "f1"));
+
+ List<ManifestEntry> filesZorder =
+ ((FileStoreTable) getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilterZorder =
+ ((ChangelogWithKeyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withValueFilter(predicate)
+ .plan()
+ .files();
+
+ Assertions.assertThat(filesFilterZorder.size() / (double)
filesZorder.size())
+ .isLessThan(filesFilter.size() / (double) files.size());
+ }
+
+ private void zorder(List<String> columns) throws Exception {
+ if (RANDOM.nextBoolean()) {
+ new SortCompactAction(
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyMap(),
+ Collections.emptyMap())
+ .withOrderStrategy("zorder")
+ .withOrderColumns(columns)
+ .run();
+ } else {
+ callProcedure("zorder", columns);
+ }
+ }
+
+ private void order(List<String> columns) throws Exception {
+ if (RANDOM.nextBoolean()) {
+ new SortCompactAction(
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyMap(),
+ Collections.emptyMap())
+ .withOrderStrategy("order")
+ .withOrderColumns(columns)
+ .run();
+ } else {
+ callProcedure("order", columns);
+ }
+ }
+
+ private void callProcedure(String orderStrategy, List<String>
orderByColumns) {
+ callProcedure(
+ String.format(
+ "CALL compact('%s.%s', 'ALL', '%s', '%s')",
+ database, tableName, orderStrategy, String.join(",",
orderByColumns)),
+ false,
+ true);
+ }
+
+ // schema with all the basic types.
+ private static Schema schema() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.BIGINT());
+ schemaBuilder.column("f1", DataTypes.BIGINT());
+ schemaBuilder.column("f2", DataTypes.BIGINT());
+ schemaBuilder.column("f3", DataTypes.BIGINT());
+ schemaBuilder.option("bucket", "-1");
+ schemaBuilder.option("scan.parallelism", "6");
+ schemaBuilder.option("sink.parallelism", "3");
+ schemaBuilder.option("dynamic-bucket.target-row-num", "100");
+ schemaBuilder.primaryKey("f0");
+ return schemaBuilder.build();
+ }
+
+ private List<CommitMessage> writeData(int size) throws Exception {
+ List<CommitMessage> messages;
+ Table table = getTable();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < size; i++) {
+ for (int j = 0; j < 100; j++) {
+ batchTableWrite.write(data(i));
+ }
+ }
+ messages = batchTableWrite.prepareCommit();
+ }
+
+ return messages;
+ }
+
+ private void commit(List<CommitMessage> messages) throws Exception {
+ getTable().newBatchWriteBuilder().newCommit().commit(messages);
+ }
+
+ private void createTable() throws Exception {
+ catalog.createDatabase(database, true);
+ catalog.createTable(identifier(), schema(), true);
+ }
+
+ private Table getTable() throws Exception {
+ return catalog.getTable(identifier());
+ }
+
+ private Identifier identifier() {
+ return Identifier.create(database, tableName);
+ }
+
+ private static InternalRow data(int bucket) {
+ GenericRow row =
+ GenericRow.of(
+ RANDOM.nextLong(),
+ (long) RANDOM.nextInt(10000),
+ (long) RANDOM.nextInt(10000),
+ (long) RANDOM.nextInt(10000));
+ return new DynamicBucketRow(row, bucket);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 152ca35e9..00bdbecae 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -48,9 +48,9 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/** Order Rewrite Action tests for {@link SortCompactAction}. */
-public class SortCompactActionITCase extends ActionITCaseBase {
+public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
- private static final Random random = new Random();
+ private static final Random RANDOM = new Random();
private void prepareData(int size, int loop) throws Exception {
createTable();
@@ -230,7 +230,7 @@ public class SortCompactActionITCase extends
ActionITCaseBase {
}
private void zorder(List<String> columns) throws Exception {
- if (random.nextBoolean()) {
+ if (RANDOM.nextBoolean()) {
new SortCompactAction(
warehouse,
database,
@@ -246,7 +246,7 @@ public class SortCompactActionITCase extends
ActionITCaseBase {
}
private void order(List<String> columns) throws Exception {
- if (random.nextBoolean()) {
+ if (RANDOM.nextBoolean()) {
new SortCompactAction(
warehouse,
database,
@@ -270,12 +270,12 @@ public class SortCompactActionITCase extends
ActionITCaseBase {
true);
}
- public void createTable() throws Exception {
+ private void createTable() throws Exception {
catalog.createDatabase(database, true);
catalog.createTable(identifier(), schema(), true);
}
- public Identifier identifier() {
+ private Identifier identifier() {
return Identifier.create(database, tableName);
}
@@ -319,7 +319,7 @@ public class SortCompactActionITCase extends
ActionITCaseBase {
return messages;
}
- public Table getTable() throws Exception {
+ private Table getTable() throws Exception {
return catalog.getTable(identifier());
}
@@ -356,8 +356,8 @@ public class SortCompactActionITCase extends
ActionITCaseBase {
}
private static byte[] randomBytes() {
- byte[] binary = new byte[random.nextInt(10)];
- random.nextBytes(binary);
+ byte[] binary = new byte[RANDOM.nextInt(10)];
+ RANDOM.nextBytes(binary);
return binary;
}
}