This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 6925f02 Remove shuffle for unaware append table write (#90)
6925f02 is described below
commit 6925f02722452cd9032ce2c1773702f0a6482ccc
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 26 14:43:37 2024 +0800
Remove shuffle for unaware append table write (#90)
---
.../org/apache/paimon/trino/TrinoMetadata.java | 86 +++++++---------------
.../trino/TrinoNodePartitioningProvider.java | 17 ++---
.../paimon/trino/UnawareTableShuffleFunction.java | 66 -----------------
3 files changed, 34 insertions(+), 135 deletions(-)
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index b987ede..359bad3 100644
--- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -120,43 +121,25 @@ public class TrinoMetadata implements ConnectorMetadata {
if (!(table instanceof FileStoreTable)) {
throw new IllegalArgumentException(table.getClass() + " is not
supported");
}
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- switch (fileStoreTable.bucketMode()) {
+ FileStoreTable storeTable = (FileStoreTable) table;
+ BucketMode bucketMode = storeTable.bucketMode();
+ switch (bucketMode) {
case FIXED:
try {
return Optional.of(
new ConnectorTableLayout(
new TrinoPartitioningHandle(
- InstantiationUtil.serializeObject(
- fileStoreTable.schema()),
+
InstantiationUtil.serializeObject(storeTable.schema()),
FIXED),
- fileStoreTable.schema().bucketKeys(),
+ storeTable.schema().bucketKeys(),
false));
} catch (IOException e) {
throw new RuntimeException(e);
}
- case DYNAMIC:
- case GLOBAL_DYNAMIC:
- if (table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only primary-key table can support dynamic
bucket.");
- }
- throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
case UNAWARE:
- try {
- return Optional.of(
- new ConnectorTableLayout(
- new TrinoPartitioningHandle(
- InstantiationUtil.serializeObject(
- fileStoreTable.schema()),
- UNAWARE),
- fileStoreTable.schema().partitionKeys(),
- true));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return Optional.empty();
default:
- throw new IllegalArgumentException("Unknown bucket mode");
+ throw new IllegalArgumentException("Unknown table bucket mode:
" + bucketMode);
}
}
@@ -242,15 +225,15 @@ public class TrinoMetadata implements ConnectorMetadata {
ConnectorSession session, ConnectorTableHandle tableHandle) {
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Table table = trinoTableHandle.table(catalog);
- try {
- if (table.getClass()
- ==
Class.forName("org.apache.paimon.table.AppendOnlyFileStoreTable")) {
- throw new IllegalArgumentException("Append-only table does not
support upsert");
- }
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
+ if (!(table instanceof FileStoreTable)) {
+ throw new IllegalArgumentException(table.getClass() + " is not
supported");
+ }
+ FileStoreTable storeTable = (FileStoreTable) table;
+ BucketMode bucketMode = storeTable.bucketMode();
+ if (bucketMode != FIXED) {
+ throw new IllegalArgumentException("Unsupported table bucket mode:
" + bucketMode);
}
- Set<String> pkSet =
table.primaryKeys().stream().collect(Collectors.toSet());
+ Set<String> pkSet = new HashSet<>(table.primaryKeys());
DataField[] row =
table.rowType().getFields().stream()
.filter(dataField -> pkSet.contains(dataField.name()))
@@ -267,32 +250,17 @@ public class TrinoMetadata implements ConnectorMetadata {
if (!(table instanceof FileStoreTable)) {
throw new IllegalArgumentException(table.getClass() + " is not
supported");
}
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- switch (fileStoreTable.bucketMode()) {
- case FIXED:
- try {
- return Optional.of(
- new TrinoPartitioningHandle(
-
InstantiationUtil.serializeObject(fileStoreTable.schema()),
- FIXED));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- case DYNAMIC:
- case GLOBAL_DYNAMIC:
- if (table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only primary-key table can support dynamic
bucket.");
- }
- throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
- case UNAWARE:
- if (!table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only append table can support unaware bucket.");
- }
- throw new IllegalArgumentException("Unaware bucket mode are
not supported");
- default:
- throw new IllegalArgumentException("Unknown bucket mode");
+ FileStoreTable storeTable = (FileStoreTable) table;
+ BucketMode bucketMode = storeTable.bucketMode();
+ if (bucketMode != FIXED) {
+ throw new IllegalArgumentException("Unsupported table bucket mode:
" + bucketMode);
+ }
+ try {
+ return Optional.of(
+ new TrinoPartitioningHandle(
+
InstantiationUtil.serializeObject(storeTable.schema()), FIXED));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
index c4c5bbb..0aed279 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -18,6 +18,8 @@
package org.apache.paimon.trino;
+import org.apache.paimon.table.BucketMode;
+
import com.google.inject.Inject;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -44,16 +46,11 @@ public class TrinoNodePartitioningProvider implements
ConnectorNodePartitioningP
// todo support dynamic bucket tables
TrinoPartitioningHandle trinoPartitioningHandle =
(TrinoPartitioningHandle) partitioningHandle;
- switch (trinoPartitioningHandle.getBucketMode()) {
- case FIXED:
- return new FixedBucketTableShuffleFunction(
- partitionChannelTypes, trinoPartitioningHandle,
workerCount);
- case UNAWARE:
- return new UnawareTableShuffleFunction(
- partitionChannelTypes, trinoPartitioningHandle,
workerCount);
- default:
- throw new UnsupportedOperationException(
- "Unsupported bucket mode: " +
trinoPartitioningHandle.getBucketMode());
+ if (trinoPartitioningHandle.getBucketMode() == BucketMode.FIXED) {
+ return new FixedBucketTableShuffleFunction(
+ partitionChannelTypes, trinoPartitioningHandle,
workerCount);
}
+ throw new UnsupportedOperationException(
+ "Unsupported table bucket mode: " +
trinoPartitioningHandle.getBucketMode());
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
deleted file mode 100644
index cc6aa58..0000000
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.RowKind;
-
-import io.trino.spi.Page;
-import io.trino.spi.connector.BucketFunction;
-import io.trino.spi.type.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/** Trino {@link BucketFunction}. */
-public class UnawareTableShuffleFunction implements BucketFunction {
- private static final Logger LOG =
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
- private final int workerCount;
- private final boolean hasPartitionKeys;
- private final ThreadLocal<Projection> projectionContext;
-
- public UnawareTableShuffleFunction(
- List<Type> partitionChannelTypes,
- TrinoPartitioningHandle partitioningHandle,
- int workerCount) {
- this.hasPartitionKeys = partitionChannelTypes.size() > 0;
- TableSchema schema = partitioningHandle.getOriginalSchema();
- this.projectionContext =
- ThreadLocal.withInitial(
- () ->
- CodeGenUtils.newProjection(
- schema.logicalPartitionType(),
schema.partitionKeys()));
- this.workerCount = workerCount;
- }
-
- @Override
- public int getBucket(Page page, int position) {
- if (!hasPartitionKeys) {
- return 0;
- } else {
- TrinoRow trinoRow = new
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
- BinaryRow partition = projectionContext.get().apply(trinoRow);
- return partition.hashCode() % workerCount;
- }
- }
-}