This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e1f652801 [core] Fix thread safety bug in Partial-update with agg
(#3777)
e1f652801 is described below
commit e1f652801ad249375a986302517be01d865c9bf3
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 19:07:13 2024 +0800
[core] Fix thread safety bug in Partial-update with agg (#3777)
---
.../compact/PartialUpdateMergeFunction.java | 60 +++++++++++++---------
1 file changed, 36 insertions(+), 24 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index f1269119f..09549ec35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -50,6 +51,7 @@ import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static
org.apache.paimon.mergetree.compact.aggregate.FieldAggregator.createFieldAggregator;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
/**
@@ -268,9 +270,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private final List<DataType> tableTypes;
- private final Map<Integer, FieldsComparator> fieldSeqComparators;
+ private final Map<Integer, Supplier<FieldsComparator>>
fieldSeqComparators;
- private final Map<Integer, FieldAggregator> fieldAggregators;
+ private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
private final boolean removeRecordOnDelete;
@@ -296,8 +298,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
.map(fieldName ->
validateFieldName(fieldName, fieldNames))
.collect(Collectors.toList());
- UserDefinedSeqComparator userDefinedSeqComparator =
- UserDefinedSeqComparator.create(rowType,
sequenceFields);
+ Supplier<FieldsComparator> userDefinedSeqComparator =
+ () -> UserDefinedSeqComparator.create(rowType,
sequenceFields);
Arrays.stream(v.split(FIELDS_SEPARATOR))
.map(
fieldName ->
@@ -360,7 +362,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
RowType newRowType =
RowType.builder().fields(newDataTypes).build();
fieldSeqComparators.forEach(
- (field, comparator) -> {
+ (field, comparatorSupplier) -> {
+ FieldsComparator comparator =
comparatorSupplier.get();
int newField = indexMap.getOrDefault(field, -1);
if (newField != -1) {
int[] newSequenceFields =
@@ -390,7 +393,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
});
for (int i = 0; i < projects.length; i++) {
if (fieldAggregators.containsKey(projects[i])) {
- projectedAggregators.put(i,
fieldAggregators.get(projects[i]));
+ projectedAggregators.put(i,
fieldAggregators.get(projects[i]).get());
}
}
@@ -402,6 +405,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
!fieldSeqComparators.isEmpty(),
removeRecordOnDelete);
} else {
+ Map<Integer, FieldsComparator> fieldSeqComparators = new
HashMap<>();
+ this.fieldSeqComparators.forEach(
+ (f, supplier) -> fieldSeqComparators.put(f,
supplier.get()));
+ Map<Integer, FieldAggregator> fieldAggregators = new
HashMap<>();
+ this.fieldAggregators.forEach(
+ (f, supplier) -> fieldAggregators.put(f,
supplier.get()));
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
@@ -425,11 +434,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
int[] topProjects = Projection.of(projection).toTopLevelIndexes();
Set<Integer> indexSet =
Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
for (int index : topProjects) {
- FieldsComparator comparator = fieldSeqComparators.get(index);
- if (comparator == null) {
+ Supplier<FieldsComparator> comparatorSupplier =
fieldSeqComparators.get(index);
+ if (comparatorSupplier == null) {
continue;
}
+ FieldsComparator comparator = comparatorSupplier.get();
for (int field : comparator.compareFields()) {
if (!indexSet.contains(field)) {
extraFields.add(field);
@@ -464,12 +474,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
*
* @return The aggregators for each column.
*/
- private Map<Integer, FieldAggregator> createFieldAggregators(
+ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
RowType rowType, List<String> primaryKeys, CoreOptions
options) {
List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = rowType.getFieldTypes();
- Map<Integer, FieldAggregator> fieldAggregators = new HashMap<>();
+ Map<Integer, Supplier<FieldAggregator>> fieldAggregators = new
HashMap<>();
String defaultAggFunc = options.fieldsDefaultFunc();
for (int i = 0; i < fieldNames.size(); i++) {
String fieldName = fieldNames.get(i);
@@ -482,23 +492,25 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
if (strAggFunc != null) {
fieldAggregators.put(
i,
- FieldAggregator.createFieldAggregator(
- fieldType,
- strAggFunc,
- ignoreRetract,
- isPrimaryKey,
- options,
- fieldName));
+ () ->
+ createFieldAggregator(
+ fieldType,
+ strAggFunc,
+ ignoreRetract,
+ isPrimaryKey,
+ options,
+ fieldName));
} else if (defaultAggFunc != null) {
fieldAggregators.put(
i,
- FieldAggregator.createFieldAggregator(
- fieldType,
- defaultAggFunc,
- ignoreRetract,
- isPrimaryKey,
- options,
- fieldName));
+ () ->
+ createFieldAggregator(
+ fieldType,
+ defaultAggFunc,
+ ignoreRetract,
+ isPrimaryKey,
+ options,
+ fieldName));
}
}
return fieldAggregators;