This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 74e13b1aa0 Merge new columns in existing record with default merge
strategy (#9851)
74e13b1aa0 is described below
commit 74e13b1aa064e13aebba060ec1d292689721ff91
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Nov 29 10:02:55 2022 -0800
Merge new columns in existing record with default merge strategy (#9851)
---
.../segment/local/upsert/PartialUpsertHandler.java | 36 ++++++++++++----------
.../local/upsert/PartialUpsertHandlerTest.java | 5 +--
2 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 4a1cfad39f..3444a5ac54 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
@@ -33,20 +34,19 @@ import org.apache.pinot.spi.data.readers.GenericRow;
public class PartialUpsertHandler {
// _column2Mergers maintains the mapping of merge strategies per columns.
private final Map<String, PartialUpsertMerger> _column2Mergers = new
HashMap<>();
+ private final PartialUpsertMerger _defaultPartialUpsertMerger;
+ private final String _comparisonColumn;
+ private final List<String> _primaryKeyColumns;
public PartialUpsertHandler(Schema schema, Map<String,
UpsertConfig.Strategy> partialUpsertStrategies,
UpsertConfig.Strategy defaultPartialUpsertStrategy, String
comparisonColumn) {
+ _defaultPartialUpsertMerger =
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+ _comparisonColumn = comparisonColumn;
+ _primaryKeyColumns = schema.getPrimaryKeyColumns();
+
for (Map.Entry<String, UpsertConfig.Strategy> entry :
partialUpsertStrategies.entrySet()) {
_column2Mergers.put(entry.getKey(),
PartialUpsertMergerFactory.getMerger(entry.getValue()));
}
- // For all physical columns (including date time columns) except for
primary key columns and comparison column.
- // If no comparison column is configured, use main time column as the
comparison time.
- for (String columnName : schema.getPhysicalColumnNames()) {
- if (!schema.getPrimaryKeyColumns().contains(columnName) &&
!_column2Mergers.containsKey(columnName)
- && !comparisonColumn.equals(columnName)) {
- _column2Mergers.put(columnName,
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy));
- }
- }
}
/**
@@ -65,15 +65,17 @@ public class PartialUpsertHandler {
* @return a new row after merge
*/
public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
- for (Map.Entry<String, PartialUpsertMerger> entry :
_column2Mergers.entrySet()) {
- String column = entry.getKey();
- if (!previousRecord.isNullValue(column)) {
- if (newRecord.isNullValue(column)) {
- newRecord.putValue(column, previousRecord.getValue(column));
- newRecord.removeNullValueField(column);
- } else {
- newRecord.putValue(column,
- entry.getValue().merge(previousRecord.getValue(column),
newRecord.getValue(column)));
+ for (String column : previousRecord.getFieldToValueMap().keySet()) {
+ if (!_primaryKeyColumns.contains(column) &&
!_comparisonColumn.equals(column)) {
+ if (!previousRecord.isNullValue(column)) {
+ if (newRecord.isNullValue(column)) {
+ newRecord.putValue(column, previousRecord.getValue(column));
+ newRecord.removeNullValueField(column);
+ } else {
+ PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
+ newRecord.putValue(column,
+ merger.merge(previousRecord.getValue(column),
newRecord.getValue(column)));
+ }
}
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 31a508e988..913215c85d 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -66,7 +66,7 @@ public class PartialUpsertHandlerTest {
// newRecord is default null value, while previousRecord is not.
// field1 should not be incremented since the newRecord is null.
- // special case: field2 should be overrided by null value because we
didn't enabled default partial upsert strategy.
+ // special case: field2 should be merged based on default partial upsert
strategy.
previousRecord.clear();
incomingRecord.clear();
previousRecord.putValue("field1", 1);
@@ -76,7 +76,8 @@ public class PartialUpsertHandlerTest {
newRecord = handler.merge(previousRecord, incomingRecord);
assertFalse(newRecord.isNullValue("field1"));
assertEquals(newRecord.getValue("field1"), 1);
- assertTrue(newRecord.isNullValue("field2"));
+ assertFalse(newRecord.isNullValue("field2"));
+ assertEquals(newRecord.getValue("field2"), 2);
// neither of records is null.
previousRecord.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]