This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6925f02  Remove shuffle for unaware append table write (#90)
6925f02 is described below

commit 6925f02722452cd9032ce2c1773702f0a6482ccc
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 26 14:43:37 2024 +0800

    Remove shuffle for unaware append table write (#90)
---
 .../org/apache/paimon/trino/TrinoMetadata.java     | 86 +++++++---------------
 .../trino/TrinoNodePartitioningProvider.java       | 17 ++---
 .../paimon/trino/UnawareTableShuffleFunction.java  | 66 -----------------
 3 files changed, 34 insertions(+), 135 deletions(-)

diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index b987ede..359bad3 100644
--- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -120,43 +121,25 @@ public class TrinoMetadata implements ConnectorMetadata {
         if (!(table instanceof FileStoreTable)) {
             throw new IllegalArgumentException(table.getClass() + " is not 
supported");
         }
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        switch (fileStoreTable.bucketMode()) {
+        FileStoreTable storeTable = (FileStoreTable) table;
+        BucketMode bucketMode = storeTable.bucketMode();
+        switch (bucketMode) {
             case FIXED:
                 try {
                     return Optional.of(
                             new ConnectorTableLayout(
                                     new TrinoPartitioningHandle(
-                                            InstantiationUtil.serializeObject(
-                                                    fileStoreTable.schema()),
+                                            
InstantiationUtil.serializeObject(storeTable.schema()),
                                             FIXED),
-                                    fileStoreTable.schema().bucketKeys(),
+                                    storeTable.schema().bucketKeys(),
                                     false));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
-            case DYNAMIC:
-            case GLOBAL_DYNAMIC:
-                if (table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Only primary-key table can support dynamic 
bucket.");
-                }
-                throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
             case UNAWARE:
-                try {
-                    return Optional.of(
-                            new ConnectorTableLayout(
-                                    new TrinoPartitioningHandle(
-                                            InstantiationUtil.serializeObject(
-                                                    fileStoreTable.schema()),
-                                            UNAWARE),
-                                    fileStoreTable.schema().partitionKeys(),
-                                    true));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
+                return Optional.empty();
             default:
-                throw new IllegalArgumentException("Unknown bucket mode");
+                throw new IllegalArgumentException("Unknown table bucket mode: 
" + bucketMode);
         }
     }
 
@@ -242,15 +225,15 @@ public class TrinoMetadata implements ConnectorMetadata {
             ConnectorSession session, ConnectorTableHandle tableHandle) {
         TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
         Table table = trinoTableHandle.table(catalog);
-        try {
-            if (table.getClass()
-                    == 
Class.forName("org.apache.paimon.table.AppendOnlyFileStoreTable")) {
-                throw new IllegalArgumentException("Append-only table does not 
support upsert");
-            }
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+        if (!(table instanceof FileStoreTable)) {
+            throw new IllegalArgumentException(table.getClass() + " is not 
supported");
+        }
+        FileStoreTable storeTable = (FileStoreTable) table;
+        BucketMode bucketMode = storeTable.bucketMode();
+        if (bucketMode != FIXED) {
+            throw new IllegalArgumentException("Unsupported table bucket mode: 
" + bucketMode);
         }
-        Set<String> pkSet = 
table.primaryKeys().stream().collect(Collectors.toSet());
+        Set<String> pkSet = new HashSet<>(table.primaryKeys());
         DataField[] row =
                 table.rowType().getFields().stream()
                         .filter(dataField -> pkSet.contains(dataField.name()))
@@ -267,32 +250,17 @@ public class TrinoMetadata implements ConnectorMetadata {
         if (!(table instanceof FileStoreTable)) {
             throw new IllegalArgumentException(table.getClass() + " is not 
supported");
         }
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        switch (fileStoreTable.bucketMode()) {
-            case FIXED:
-                try {
-                    return Optional.of(
-                            new TrinoPartitioningHandle(
-                                    
InstantiationUtil.serializeObject(fileStoreTable.schema()),
-                                    FIXED));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            case DYNAMIC:
-            case GLOBAL_DYNAMIC:
-                if (table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Only primary-key table can support dynamic 
bucket.");
-                }
-                throw new IllegalArgumentException("Global dynamic bucket mode 
are not supported");
-            case UNAWARE:
-                if (!table.primaryKeys().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Only append table can support unaware bucket.");
-                }
-                throw new IllegalArgumentException("Unaware bucket mode are 
not supported");
-            default:
-                throw new IllegalArgumentException("Unknown bucket mode");
+        FileStoreTable storeTable = (FileStoreTable) table;
+        BucketMode bucketMode = storeTable.bucketMode();
+        if (bucketMode != FIXED) {
+            throw new IllegalArgumentException("Unsupported table bucket mode: 
" + bucketMode);
+        }
+        try {
+            return Optional.of(
+                    new TrinoPartitioningHandle(
+                            
InstantiationUtil.serializeObject(storeTable.schema()), FIXED));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
index c4c5bbb..0aed279 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.trino;
 
+import org.apache.paimon.table.BucketMode;
+
 import com.google.inject.Inject;
 import io.trino.spi.connector.BucketFunction;
 import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -44,16 +46,11 @@ public class TrinoNodePartitioningProvider implements 
ConnectorNodePartitioningP
         // todo support dynamic bucket tables
         TrinoPartitioningHandle trinoPartitioningHandle =
                 (TrinoPartitioningHandle) partitioningHandle;
-        switch (trinoPartitioningHandle.getBucketMode()) {
-            case FIXED:
-                return new FixedBucketTableShuffleFunction(
-                        partitionChannelTypes, trinoPartitioningHandle, 
workerCount);
-            case UNAWARE:
-                return new UnawareTableShuffleFunction(
-                        partitionChannelTypes, trinoPartitioningHandle, 
workerCount);
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported bucket mode: " + 
trinoPartitioningHandle.getBucketMode());
+        if (trinoPartitioningHandle.getBucketMode() == BucketMode.FIXED) {
+            return new FixedBucketTableShuffleFunction(
+                    partitionChannelTypes, trinoPartitioningHandle, 
workerCount);
         }
+        throw new UnsupportedOperationException(
+                "Unsupported table bucket mode: " + 
trinoPartitioningHandle.getBucketMode());
     }
 }
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
deleted file mode 100644
index cc6aa58..0000000
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.RowKind;
-
-import io.trino.spi.Page;
-import io.trino.spi.connector.BucketFunction;
-import io.trino.spi.type.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/** Trino {@link BucketFunction}. */
-public class UnawareTableShuffleFunction implements BucketFunction {
-    private static final Logger LOG = 
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
-    private final int workerCount;
-    private final boolean hasPartitionKeys;
-    private final ThreadLocal<Projection> projectionContext;
-
-    public UnawareTableShuffleFunction(
-            List<Type> partitionChannelTypes,
-            TrinoPartitioningHandle partitioningHandle,
-            int workerCount) {
-        this.hasPartitionKeys = partitionChannelTypes.size() > 0;
-        TableSchema schema = partitioningHandle.getOriginalSchema();
-        this.projectionContext =
-                ThreadLocal.withInitial(
-                        () ->
-                                CodeGenUtils.newProjection(
-                                        schema.logicalPartitionType(), 
schema.partitionKeys()));
-        this.workerCount = workerCount;
-    }
-
-    @Override
-    public int getBucket(Page page, int position) {
-        if (!hasPartitionKeys) {
-            return 0;
-        } else {
-            TrinoRow trinoRow = new 
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
-            BinaryRow partition = projectionContext.get().apply(trinoRow);
-            return partition.hashCode() % workerCount;
-        }
-    }
-}

Reply via email to