Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1520578347


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -51,6 +51,9 @@ public enum Strategy {
   @JsonPropertyDescription("default upsert strategy for partial mode")
   private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE;
 
+  @JsonPropertyDescription("Class name for custom row merger implementation")
+  private String _rowMergerCustomImplementation;

Review Comment:
   Suggest renaming it to `partialUpsertMergerClass`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java:
##########
@@ -18,13 +18,28 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+import java.util.Map;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Merger previously persisted row with the new incoming row.
+ * <p>
+ * Implement this interface to define logic to merge rows. {@link LazyRow} 
provides abstraction row like abstraction
+ * to read previously persisted row by lazily loading column values if needed. 
For automatic plugging of the
+ * interface via {@link 
org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory}
+ * implement {@link BasePartialUpsertMerger}
+ */
 public interface PartialUpsertMerger {
+
   /**
-   * Handle partial upsert merge.
-   *
-   * @param previousValue the value of given field from the last derived full 
record during ingestion.
-   * @param currentValue the value of given field from the new consumed record.
-   * @return a new value after merge
+   * Merge previous row with new incoming row and persist the merged results 
per column in the provided
+   * mergerResult map. {@link 
org.apache.pinot.segment.local.upsert.PartialUpsertHandler} ensures the primary 
key and
+   * comparison columns are not modified, comparison columns are merged and 
only the latest non values are stored.
+   * @param prevRecord
+   * @param newRecord
+   * @param mergerResult
    */
-  Object merge(Object previousValue, Object currentValue);
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, 
Object> mergerResult);

Review Comment:
   (minot) Remove `public`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -18,89 +18,82 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
-import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
+import 
org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import 
org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger;
+import 
org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous 
record.
+ * It uses the configured merge strategies to merge the columns. If no merge 
strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by 
implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column 
mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new 
HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final PartialUpsertColumnMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, 
UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> 
comparisonColumns) {
-    _defaultPartialUpsertMerger = 
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, 
UpsertConfig upsertConfig) {
+    _defaultPartialUpsertMerger =
+        
PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), 
PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, 
comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For 
these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the 
prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, 
regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the 
new value is not null.
-   * Null values will override existing values if not configured. They can be 
ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column 
values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, 
Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, 
_defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and 
use the new value
-        // (2) Else If the value of new value is null, use the previous value 
(even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the 
merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous 
_comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function 
to it, rather we just take any/all
-              // non-null comparison column values from the previous record, 
and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, 
newRecord.getValue(column)));
-            }
-          }
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value 
if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // use merged column value from result map
+      if (reuseMergerResult.containsKey(column)) {
+        Object mergedValue = reuseMergerResult.get(column);
+        if (mergedValue != null) {
+          // remove null value field if it was set
+          newRecord.removeNullValueField(column);
+          newRecord.putValue(column, mergedValue);
         } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not 
null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use 
the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+          // if column exists but mapped to a null value then merger result 
was a null value
+          newRecord.addNullValueField(column);
+          newRecord.putValue(column, null);
         }
+      } else if (!(_partialUpsertMerger instanceof 
PartialUpsertColumnarMerger)) {
+        // PartialUpsertColumnMerger already handles default merger but for 
any custom implementations
+        // non merged columns need to be applied with default merger
+        newRecord.putValue(column,
+            _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), 
newRecord.getValue(column)));

Review Comment:
   The default strategy is columnar based. Do we want to apply it for other 
mergers? I feel it is more intuitive if all the merge logic is handled within 
the merger



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -18,89 +18,82 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
-import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
+import 
org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import 
org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger;
+import 
org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous 
record.
+ * It uses the configured merge strategies to merge the columns. If no merge 
strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by 
implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column 
mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new 
HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final PartialUpsertColumnMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, 
UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> 
comparisonColumns) {
-    _defaultPartialUpsertMerger = 
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, 
UpsertConfig upsertConfig) {
+    _defaultPartialUpsertMerger =
+        
PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), 
PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, 
comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For 
these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the 
prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, 
regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the 
new value is not null.
-   * Null values will override existing values if not configured. They can be 
ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column 
values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, 
Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, 
_defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and 
use the new value
-        // (2) Else If the value of new value is null, use the previous value 
(even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the 
merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous 
_comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function 
to it, rather we just take any/all
-              // non-null comparison column values from the previous record, 
and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, 
newRecord.getValue(column)));
-            }
-          }
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value 
if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // use merged column value from result map
+      if (reuseMergerResult.containsKey(column)) {
+        Object mergedValue = reuseMergerResult.get(column);
+        if (mergedValue != null) {
+          // remove null value field if it was set
+          newRecord.removeNullValueField(column);
+          newRecord.putValue(column, mergedValue);
         } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not 
null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use 
the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+          // if column exists but mapped to a null value then merger result 
was a null value
+          newRecord.addNullValueField(column);
+          newRecord.putValue(column, null);
         }
+      } else if (!(_partialUpsertMerger instanceof 
PartialUpsertColumnarMerger)) {
+        // PartialUpsertColumnMerger already handles default merger but for 
any custom implementations
+        // non merged columns need to be applied with default merger
+        newRecord.putValue(column,
+            _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), 
newRecord.getValue(column)));

Review Comment:
   If we do not handle default merge here, we may iterate over the merger 
result to reduce map lookups



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to