This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f652801 [core] Fix thread safety bug in Partial-update with agg 
(#3777)
e1f652801 is described below

commit e1f652801ad249375a986302517be01d865c9bf3
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 19:07:13 2024 +0800

    [core] Fix thread safety bug in Partial-update with agg (#3777)
---
 .../compact/PartialUpdateMergeFunction.java        | 60 +++++++++++++---------
 1 file changed, 36 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index f1269119f..09549ec35 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -50,6 +51,7 @@ import java.util.stream.Stream;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
 import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static 
org.apache.paimon.mergetree.compact.aggregate.FieldAggregator.createFieldAggregator;
 import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
 
 /**
@@ -268,9 +270,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private final List<DataType> tableTypes;
 
-        private final Map<Integer, FieldsComparator> fieldSeqComparators;
+        private final Map<Integer, Supplier<FieldsComparator>> 
fieldSeqComparators;
 
-        private final Map<Integer, FieldAggregator> fieldAggregators;
+        private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
 
         private final boolean removeRecordOnDelete;
 
@@ -296,8 +298,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                     .map(fieldName -> 
validateFieldName(fieldName, fieldNames))
                                     .collect(Collectors.toList());
 
-                    UserDefinedSeqComparator userDefinedSeqComparator =
-                            UserDefinedSeqComparator.create(rowType, 
sequenceFields);
+                    Supplier<FieldsComparator> userDefinedSeqComparator =
+                            () -> UserDefinedSeqComparator.create(rowType, 
sequenceFields);
                     Arrays.stream(v.split(FIELDS_SEPARATOR))
                             .map(
                                     fieldName ->
@@ -360,7 +362,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 RowType newRowType = 
RowType.builder().fields(newDataTypes).build();
 
                 fieldSeqComparators.forEach(
-                        (field, comparator) -> {
+                        (field, comparatorSupplier) -> {
+                            FieldsComparator comparator = 
comparatorSupplier.get();
                             int newField = indexMap.getOrDefault(field, -1);
                             if (newField != -1) {
                                 int[] newSequenceFields =
@@ -390,7 +393,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                         });
                 for (int i = 0; i < projects.length; i++) {
                     if (fieldAggregators.containsKey(projects[i])) {
-                        projectedAggregators.put(i, 
fieldAggregators.get(projects[i]));
+                        projectedAggregators.put(i, 
fieldAggregators.get(projects[i]).get());
                     }
                 }
 
@@ -402,6 +405,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                         !fieldSeqComparators.isEmpty(),
                         removeRecordOnDelete);
             } else {
+                Map<Integer, FieldsComparator> fieldSeqComparators = new 
HashMap<>();
+                this.fieldSeqComparators.forEach(
+                        (f, supplier) -> fieldSeqComparators.put(f, 
supplier.get()));
+                Map<Integer, FieldAggregator> fieldAggregators = new 
HashMap<>();
+                this.fieldAggregators.forEach(
+                        (f, supplier) -> fieldAggregators.put(f, 
supplier.get()));
                 return new PartialUpdateMergeFunction(
                         createFieldGetters(tableTypes),
                         ignoreDelete,
@@ -425,11 +434,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             int[] topProjects = Projection.of(projection).toTopLevelIndexes();
             Set<Integer> indexSet = 
Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
             for (int index : topProjects) {
-                FieldsComparator comparator = fieldSeqComparators.get(index);
-                if (comparator == null) {
+                Supplier<FieldsComparator> comparatorSupplier = 
fieldSeqComparators.get(index);
+                if (comparatorSupplier == null) {
                     continue;
                 }
 
+                FieldsComparator comparator = comparatorSupplier.get();
                 for (int field : comparator.compareFields()) {
                     if (!indexSet.contains(field)) {
                         extraFields.add(field);
@@ -464,12 +474,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
          *
          * @return The aggregators for each column.
          */
-        private Map<Integer, FieldAggregator> createFieldAggregators(
+        private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
                 RowType rowType, List<String> primaryKeys, CoreOptions 
options) {
 
             List<String> fieldNames = rowType.getFieldNames();
             List<DataType> fieldTypes = rowType.getFieldTypes();
-            Map<Integer, FieldAggregator> fieldAggregators = new HashMap<>();
+            Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new 
HashMap<>();
             String defaultAggFunc = options.fieldsDefaultFunc();
             for (int i = 0; i < fieldNames.size(); i++) {
                 String fieldName = fieldNames.get(i);
@@ -482,23 +492,25 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 if (strAggFunc != null) {
                     fieldAggregators.put(
                             i,
-                            FieldAggregator.createFieldAggregator(
-                                    fieldType,
-                                    strAggFunc,
-                                    ignoreRetract,
-                                    isPrimaryKey,
-                                    options,
-                                    fieldName));
+                            () ->
+                                    createFieldAggregator(
+                                            fieldType,
+                                            strAggFunc,
+                                            ignoreRetract,
+                                            isPrimaryKey,
+                                            options,
+                                            fieldName));
                 } else if (defaultAggFunc != null) {
                     fieldAggregators.put(
                             i,
-                            FieldAggregator.createFieldAggregator(
-                                    fieldType,
-                                    defaultAggFunc,
-                                    ignoreRetract,
-                                    isPrimaryKey,
-                                    options,
-                                    fieldName));
+                            () ->
+                                    createFieldAggregator(
+                                            fieldType,
+                                            defaultAggFunc,
+                                            ignoreRetract,
+                                            isPrimaryKey,
+                                            options,
+                                            fieldName));
                 }
             }
             return fieldAggregators;

Reply via email to