This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f28a5cc6f fix merging null multi value in partial upsert (#13031)
0f28a5cc6f is described below

commit 0f28a5cc6f58688040c6e20c458581a1077ed0cd
Author: rohit <[email protected]>
AuthorDate: Thu May 2 01:35:14 2024 +0530

    fix merging null multi value in partial upsert (#13031)
---
 .../pinot/segment/local/upsert/PartialUpsertHandler.java | 16 ++++++++++++++--
 .../segment/local/upsert/PartialUpsertHandlerTest.java   |  9 +++++++--
 2 files changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 118412ab77..ad73de9d70 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -47,12 +48,24 @@ public class PartialUpsertHandler {
   private final TreeMap<String, FieldSpec> _fieldSpecMap;
   private final PartialUpsertMerger _partialUpsertMerger;
 
+  private final Map<String, Object> _defaultNullValues = new HashMap<>();
+
   public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, 
UpsertConfig upsertConfig) {
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
     _comparisonColumns = comparisonColumns;
     _fieldSpecMap = schema.getFieldSpecMap();
     _partialUpsertMerger =
         PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, 
comparisonColumns, upsertConfig);
+    // cache default null values to handle null merger results
+    for (Map.Entry<String, FieldSpec> entry : 
schema.getFieldSpecMap().entrySet()) {
+      String column = entry.getKey();
+      FieldSpec fieldSpec = entry.getValue();
+      if (fieldSpec.isSingleValueField()) {
+        _defaultNullValues.put(column, fieldSpec.getDefaultNullValue());
+      } else {
+        _defaultNullValues.put(column, new 
Object[]{fieldSpec.getDefaultNullValue()});
+      }
+    }
   }
 
   public void merge(LazyRow previousRow, GenericRow newRow, Map<String, 
Object> resultHolder) {
@@ -83,8 +96,7 @@ public class PartialUpsertHandler {
       row.removeNullValueField(column);
       row.putValue(column, mergedValue);
     } else {
-      // if column exists but mapped to a null value then merger result was a 
null value
-      row.putDefaultNullValue(column, 
_fieldSpecMap.get(column).getDefaultNullValue());
+      row.putDefaultNullValue(column, _defaultNullValues.get(column));
     }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 4b954aa140..fc8fdbdefb 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -86,11 +86,14 @@ public class PartialUpsertHandlerTest {
     newRowData.put("hoursSinceEpoch", null); // testing null comparison column
     GenericRow newRecord = initGenericRow(new GenericRow(), newRowData);
     LazyRow prevRecord = mock(LazyRow.class);
-    mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", 
"hoursSinceEpoch", 2L));
-    Map<String, Object> expectedData = new HashMap<>(Map.of("pk", "pk1", 
"field2", "reset", "hoursSinceEpoch", 2L));
+    mockLazyRow(prevRecord,
+        Map.of("pk", "pk1", "field1", 5L, "field2", "set", "field3", new 
Integer[]{0}, "hoursSinceEpoch", 2L));
+    Map<String, Object> expectedData = new HashMap<>(
+        Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L));
     expectedData.put("field1", Long.MIN_VALUE);
     GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData);
     expectedRecord.addNullValueField("field1");
+    expectedRecord.putDefaultNullValue("field3", new 
Object[]{Integer.MIN_VALUE});
 
     testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger());
   }
@@ -138,6 +141,7 @@ public class PartialUpsertHandlerTest {
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", 
FieldSpec.DataType.STRING)
         .addSingleValueDimension("field1", FieldSpec.DataType.LONG)
         .addSingleValueDimension("field2", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("field3", FieldSpec.DataType.INT)
         .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, 
"1:HOURS:EPOCH", "1:HOURS")
         .setPrimaryKeyColumns(Arrays.asList("pk")).build();
 
@@ -169,6 +173,7 @@ public class PartialUpsertHandlerTest {
       }
       if ((newRow.getValue("field2")).equals("reset")) {
         resultHolder.put("field1", null);
+        resultHolder.put("field3", null);
       }
     };
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to