This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 4ff1ecd Avoid using the same Projection object in different threads
(#91)
4ff1ecd is described below
commit 4ff1ecdcbc66dd9a614524cede98147be8610e75
Author: rfyu <[email protected]>
AuthorDate: Tue Nov 26 14:37:47 2024 +0800
Avoid using the same Projection object in different threads (#91)
---
.../apache/paimon/trino/FixedBucketTableShuffleFunction.java | 11 +++++++----
.../org/apache/paimon/trino/UnawareTableShuffleFunction.java | 11 +++++++----
2 files changed, 14 insertions(+), 8 deletions(-)
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
index 6732bcf..839ad0c 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/FixedBucketTableShuffleFunction.java
@@ -43,7 +43,7 @@ public class FixedBucketTableShuffleFunction implements
BucketFunction {
private final int workerCount;
private final int bucketCount;
private final boolean isRowId;
- private final Projection pkProjection;
+ private final ThreadLocal<Projection> projectionContext;
public FixedBucketTableShuffleFunction(
List<Type> partitionChannelTypes,
@@ -51,8 +51,11 @@ public class FixedBucketTableShuffleFunction implements
BucketFunction {
int workerCount) {
TableSchema schema = partitioningHandle.getOriginalSchema();
- this.pkProjection =
- CodeGenUtils.newProjection(schema.logicalPrimaryKeysType(),
schema.primaryKeys());
+ this.projectionContext =
+ ThreadLocal.withInitial(
+ () ->
+ CodeGenUtils.newProjection(
+ schema.logicalPrimaryKeysType(),
schema.primaryKeys()));
this.bucketCount = new CoreOptions(schema.options()).bucket();
this.workerCount = workerCount;
this.isRowId =
@@ -78,7 +81,7 @@ public class FixedBucketTableShuffleFunction implements
BucketFunction {
}
TrinoRow trinoRow = new TrinoRow(page.getSingleValuePage(position),
RowKind.INSERT);
- BinaryRow pk = pkProjection.apply(trinoRow);
+ BinaryRow pk = projectionContext.get().apply(trinoRow);
int bucket =
KeyAndBucketExtractor.bucket(
KeyAndBucketExtractor.bucketKeyHashCode(pk),
bucketCount);
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
index c9ed4be..cc6aa58 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/UnawareTableShuffleFunction.java
@@ -37,7 +37,7 @@ public class UnawareTableShuffleFunction implements
BucketFunction {
private static final Logger LOG =
LoggerFactory.getLogger(UnawareTableShuffleFunction.class);
private final int workerCount;
private final boolean hasPartitionKeys;
- private final Projection partitionProjection;
+ private final ThreadLocal<Projection> projectionContext;
public UnawareTableShuffleFunction(
List<Type> partitionChannelTypes,
@@ -45,8 +45,11 @@ public class UnawareTableShuffleFunction implements
BucketFunction {
int workerCount) {
this.hasPartitionKeys = partitionChannelTypes.size() > 0;
TableSchema schema = partitioningHandle.getOriginalSchema();
- this.partitionProjection =
- CodeGenUtils.newProjection(schema.logicalPartitionType(),
schema.partitionKeys());
+ this.projectionContext =
+ ThreadLocal.withInitial(
+ () ->
+ CodeGenUtils.newProjection(
+ schema.logicalPartitionType(),
schema.partitionKeys()));
this.workerCount = workerCount;
}
@@ -56,7 +59,7 @@ public class UnawareTableShuffleFunction implements
BucketFunction {
return 0;
} else {
TrinoRow trinoRow = new
TrinoRow(page.getSingleValuePage(position), RowKind.INSERT);
- BinaryRow partition = partitionProjection.apply(trinoRow);
+ BinaryRow partition = projectionContext.get().apply(trinoRow);
return partition.hashCode() % workerCount;
}
}