This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ff1ecd  Avoid using the same Projection object in different threads 
(#91)
4ff1ecd is described below

commit 4ff1ecdcbc66dd9a614524cede98147be8610e75
Author: rfyu <[email protected]>
AuthorDate: Tue Nov 26 14:37:47 2024 +0800

    Avoid using the same Projection object in different threads (#91)
---
 .../apache/paimon/trino/FixedBucketTableShuffleFunction.java  | 11 +++++++----
 .../org/apache/paimon/trino/UnawareTableShuffleFunction.java  | 11 +++++++----
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
index 6732bcf..839ad0c 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
@@ -43,7 +43,7 @@ public class FixedBucketTableShuffleFunction implements 
BucketFunction {
     private final int workerCount;
     private final int bucketCount;
     private final boolean isRowId;
-    private final Projection pkProjection;
+    private final ThreadLocal<Projection> projectionContext;
 
     public FixedBucketTableShuffleFunction(
             List<Type> partitionChannelTypes,
@@ -51,8 +51,11 @@ public class FixedBucketTableShuffleFunction implements 
BucketFunction {
             int workerCount) {
 
         TableSchema schema = partitioningHandle.getOriginalSchema();
-        this.pkProjection =
-                CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(), 
schema.primaryKeys());
+        this.projectionContext =
+                ThreadLocal.withInitial(
+                        () ->
+                                CodeGenUtils.newProjection(
+                                        schema.logicalPrimaryKeysType(), 
schema.primaryKeys()));
         this.bucketCount = new CoreOptions(schema.options()).bucket();
         this.workerCount = workerCount;
         this.isRowId =
@@ -78,7 +81,7 @@ public class FixedBucketTableShuffleFunction implements 
BucketFunction {
         }
 
         TrinoRow trinoRow = new TrinoRow(page.getSingleValuePage(position), 
RowKind.INSERT);
-        BinaryRow pk = pkProjection.apply(trinoRow);
+        BinaryRow pk = projectionContext.get().apply(trinoRow);
         int bucket =
                 KeyAndBucketExtractor.bucket(
                         KeyAndBucketExtractor.bucketKeyHashCode(pk), 
bucketCount);
diff --git 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
index c9ed4be..cc6aa58 100644
--- 
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
+++ 
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
@@ -37,7 +37,7 @@ public class UnawareTableShuffleFunction implements 
BucketFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
     private final int workerCount;
     private final boolean hasPartitionKeys;
-    private final Projection partitionProjection;
+    private final ThreadLocal<Projection> projectionContext;
 
     public UnawareTableShuffleFunction(
             List<Type> partitionChannelTypes,
@@ -45,8 +45,11 @@ public class UnawareTableShuffleFunction implements 
BucketFunction {
             int workerCount) {
         this.hasPartitionKeys = partitionChannelTypes.size() > 0;
         TableSchema schema = partitioningHandle.getOriginalSchema();
-        this.partitionProjection =
-                CodeGenUtils.newProjection(schema.logicalPartitionType(), 
schema.partitionKeys());
+        this.projectionContext =
+                ThreadLocal.withInitial(
+                        () ->
+                                CodeGenUtils.newProjection(
+                                        schema.logicalPartitionType(), 
schema.partitionKeys()));
         this.workerCount = workerCount;
     }
 
@@ -56,7 +59,7 @@ public class UnawareTableShuffleFunction implements 
BucketFunction {
             return 0;
         } else {
             TrinoRow trinoRow = new 
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
-            BinaryRow partition = partitionProjection.apply(trinoRow);
+            BinaryRow partition = projectionContext.get().apply(trinoRow);
             return partition.hashCode() % workerCount;
         }
     }

Reply via email to