This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0f28a5cc6f fix merging null multi value in partial upsert (#13031)
0f28a5cc6f is described below
commit 0f28a5cc6f58688040c6e20c458581a1077ed0cd
Author: rohit <[email protected]>
AuthorDate: Thu May 2 01:35:14 2024 +0530
fix merging null multi value in partial upsert (#13031)
---
.../pinot/segment/local/upsert/PartialUpsertHandler.java | 16 ++++++++++++++--
.../segment/local/upsert/PartialUpsertHandlerTest.java | 9 +++++++--
2 files changed, 21 insertions(+), 4 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 118412ab77..ad73de9d70 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -47,12 +48,24 @@ public class PartialUpsertHandler {
private final TreeMap<String, FieldSpec> _fieldSpecMap;
private final PartialUpsertMerger _partialUpsertMerger;
+ private final Map<String, Object> _defaultNullValues = new HashMap<>();
+
public PartialUpsertHandler(Schema schema, List<String> comparisonColumns,
UpsertConfig upsertConfig) {
_primaryKeyColumns = schema.getPrimaryKeyColumns();
_comparisonColumns = comparisonColumns;
_fieldSpecMap = schema.getFieldSpecMap();
_partialUpsertMerger =
PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns,
comparisonColumns, upsertConfig);
+ // cache default null values to handle null merger results
+ for (Map.Entry<String, FieldSpec> entry :
schema.getFieldSpecMap().entrySet()) {
+ String column = entry.getKey();
+ FieldSpec fieldSpec = entry.getValue();
+ if (fieldSpec.isSingleValueField()) {
+ _defaultNullValues.put(column, fieldSpec.getDefaultNullValue());
+ } else {
+ _defaultNullValues.put(column, new
Object[]{fieldSpec.getDefaultNullValue()});
+ }
+ }
}
public void merge(LazyRow previousRow, GenericRow newRow, Map<String,
Object> resultHolder) {
@@ -83,8 +96,7 @@ public class PartialUpsertHandler {
row.removeNullValueField(column);
row.putValue(column, mergedValue);
} else {
- // if column exists but mapped to a null value then merger result was a
null value
- row.putDefaultNullValue(column,
_fieldSpecMap.get(column).getDefaultNullValue());
+ row.putDefaultNullValue(column, _defaultNullValues.get(column));
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 4b954aa140..fc8fdbdefb 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -86,11 +86,14 @@ public class PartialUpsertHandlerTest {
newRowData.put("hoursSinceEpoch", null); // testing null comparison column
GenericRow newRecord = initGenericRow(new GenericRow(), newRowData);
LazyRow prevRecord = mock(LazyRow.class);
- mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set",
"hoursSinceEpoch", 2L));
- Map<String, Object> expectedData = new HashMap<>(Map.of("pk", "pk1",
"field2", "reset", "hoursSinceEpoch", 2L));
+ mockLazyRow(prevRecord,
+ Map.of("pk", "pk1", "field1", 5L, "field2", "set", "field3", new
Integer[]{0}, "hoursSinceEpoch", 2L));
+ Map<String, Object> expectedData = new HashMap<>(
+ Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L));
expectedData.put("field1", Long.MIN_VALUE);
GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData);
expectedRecord.addNullValueField("field1");
+ expectedRecord.putDefaultNullValue("field3", new
Object[]{Integer.MIN_VALUE});
testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger());
}
@@ -138,6 +141,7 @@ public class PartialUpsertHandlerTest {
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk",
FieldSpec.DataType.STRING)
.addSingleValueDimension("field1", FieldSpec.DataType.LONG)
.addSingleValueDimension("field2", FieldSpec.DataType.STRING)
+ .addMultiValueDimension("field3", FieldSpec.DataType.INT)
.addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG,
"1:HOURS:EPOCH", "1:HOURS")
.setPrimaryKeyColumns(Arrays.asList("pk")).build();
@@ -169,6 +173,7 @@ public class PartialUpsertHandlerTest {
}
if ((newRow.getValue("field2")).equals("reset")) {
resultHolder.put("field1", null);
+ resultHolder.put("field3", null);
}
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]