This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 51698e9b29 [Clean up] Simplify UpsertConfig (#9150)
51698e9b29 is described below

commit 51698e9b298931dc21a91c80135b436b41215ab0
Author: deemoliu <[email protected]>
AuthorDate: Wed Aug 3 17:29:22 2022 -0700

    [Clean up] Simplify UpsertConfig (#9150)
---
 .../common/utils/config/TableConfigSerDeTest.java  |  4 +-
 .../controller/helix/PinotResourceManagerTest.java |  2 +-
 .../tests/BaseClusterIntegrationTest.java          |  2 +-
 .../MutableSegmentImplUpsertComparisonColTest.java | 11 ++--
 .../mutable/MutableSegmentImplUpsertTest.java      | 10 ++--
 .../segment/local/utils/TableConfigUtilsTest.java  | 63 ++++++++++------------
 .../pinot/spi/config/table/UpsertConfig.java       | 53 +++++++++++++++---
 .../pinot/spi/config/table/UpsertConfigTest.java   | 13 +++--
 8 files changed, 97 insertions(+), 61 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 2b3e01e5b2..a4e15c4d22 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -250,9 +250,7 @@ public class TableConfigSerDeTest {
     }
     {
       // with upsert config
-      UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL, 
null, null, "comparison", HashFunction.NONE);
-
-      TableConfig tableConfig = 
tableConfigBuilder.setUpsertConfig(upsertConfig).build();
+      TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).build();
 
       // Serialize then de-serialize
       
checkTableConfigWithUpsertConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
 TableConfig.class));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 204a2ff7dd..6e4230c39a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
     
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
     realtimeTableConfig.getValidationConfig()
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    realtimeTableConfig.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null));
+    realtimeTableConfig.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL));
     TEST_INSTANCE.getHelixResourceManager().addTable(realtimeTableConfig);
   }
 
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 8308435a47..21535e5ebe 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -397,7 +397,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
   }
 
   /**
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 60e336b7ec..08cbf3e91c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -58,8 +58,11 @@ public class MutableSegmentImplUpsertComparisonColTest {
     URL schemaResourceUrl = 
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
-    _tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
"offset", null)).build();
+    UpsertConfig offsetUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    offsetUpsertConfig.setComparisonColumn("offset");
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(offsetUpsertConfig)
+            .build();
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
@@ -67,8 +70,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
             HashFunction.NONE, null, 
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
-            Collections.emptySet(), false, true, new 
UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null),
-            "secondsSinceEpoch", _partitionUpsertMetadataManager, null);
+            Collections.emptySet(), false, true, offsetUpsertConfig, 
"secondsSinceEpoch",
+            _partitionUpsertMetadataManager, null);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
         _schema.getColumnNames(), null)) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a603303bc8..015ba5705a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -56,8 +56,11 @@ public class MutableSegmentImplUpsertTest {
     URL schemaResourceUrl = 
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
-    _tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, hashFunction)).build();
+    UpsertConfig upsertConfigWithHash = new 
UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfigWithHash.setHashFunction(hashFunction);
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+            .build();
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
@@ -65,8 +68,7 @@ public class MutableSegmentImplUpsertTest {
             hashFunction, null, 
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, 
Collections.emptySet(), Collections.emptySet(),
-            Collections.emptySet(), false, true,
-            new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, 
hashFunction), "secondsSinceEpoch",
+            Collections.emptySet(), false, true, upsertConfigWithHash, 
"secondsSinceEpoch",
             _partitionUpsertMetadataManager, null);
 
     GenericRow reuse = new GenericRow();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9bca0ff9ce..41ede3df4d 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1227,8 +1227,7 @@ public class TableConfigUtilsTest {
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setDedupConfig(new DedupConfig(true, HashFunction.NONE))
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
-        .setStreamConfigs(streamConfigs).build();
+        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       Assert.fail();
@@ -1242,8 +1241,9 @@ public class TableConfigUtilsTest {
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .build();
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null)).build();
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       Assert.fail();
@@ -1251,8 +1251,8 @@ public class TableConfigUtilsTest {
       Assert.assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime 
table only.");
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null)).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       Assert.fail();
@@ -1272,8 +1272,7 @@ public class TableConfigUtilsTest {
     }
 
     Map<String, String> streamConfigs = getStreamConfigs();
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1283,8 +1282,7 @@ public class TableConfigUtilsTest {
     }
 
     streamConfigs.put("stream.kafka.consumer.type", "simple");
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1294,8 +1292,7 @@ public class TableConfigUtilsTest {
           "Upsert/Dedup table must use strict replica-group (i.e. 
strictReplicaGroup) based routing");
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1303,8 +1300,7 @@ public class TableConfigUtilsTest {
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Lists.newArrayList("myCol"), null,
         Collections.singletonList(
             new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), 10);
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
@@ -1315,8 +1311,7 @@ public class TableConfigUtilsTest {
     }
 
     //With Aggregate Metrics
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).setAggregateMetrics(true).build();
     try {
@@ -1329,8 +1324,7 @@ public class TableConfigUtilsTest {
     //With aggregation Configs in Ingestion Config
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("twiceSum", "SUM(twice)")));
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         
.setStreamConfigs(streamConfigs).setIngestionConfig(ingestionConfig).build();
     try {
@@ -1341,8 +1335,7 @@ public class TableConfigUtilsTest {
     }
 
     //With aggregation Configs in Ingestion Config and IndexingConfig at the 
same time
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null))
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).setIngestionConfig(ingestionConfig).build();
     try {
@@ -1366,12 +1359,15 @@ public class TableConfigUtilsTest {
     streamConfigs.put("stream.kafka.consumer.type", "simple");
     Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
     partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.IGNORE);
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(
-            new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE,
-                "myCol2",
-                null)).setNullHandlingEnabled(true)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-        .setStreamConfigs(streamConfigs).build();
+    UpsertConfig partialUpsertConfig = new 
UpsertConfig(UpsertConfig.Mode.PARTIAL);
+    partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
+    
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+    partialUpsertConfig.setComparisonColumn("myCol2");
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(partialUpsertConfig)
+            .setNullHandlingEnabled(true)
+            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
       Assert.fail();
@@ -1379,10 +1375,11 @@ public class TableConfigUtilsTest {
       Assert.assertEquals(e.getMessage(), "Merger cannot be applied to 
comparison column");
     }
 
+    partialUpsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+    partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
+    
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("myCol2")
-        .setUpsertConfig(
-            new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
-                null)).setNullHandlingEnabled(true)
+        .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     try {
@@ -1394,9 +1391,7 @@ public class TableConfigUtilsTest {
 
     partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeCol")
-        .setUpsertConfig(
-            new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
-                null)).setNullHandlingEnabled(false)
+        .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     try {
@@ -1483,8 +1478,8 @@ public class TableConfigUtilsTest {
 
     // invalid Upsert config with RealtimeToOfflineTask
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null, null)).setTaskConfig(
-            new 
TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
+        .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig(
+            ImmutableMap.of("RealtimeToOfflineSegmentsTask", 
realtimeToOfflineTaskConfig,
                 "SegmentGenerationAndPushTask", 
segmentGenerationAndPushTaskConfig))).build();
     try {
       TableConfigUtils.validateTaskConfigs(tableConfig, schema);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 947e5a86b4..f687be7501 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.spi.config.table;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import com.google.common.base.Preconditions;
@@ -41,21 +40,21 @@ public class UpsertConfig extends BaseJsonConfig {
   }
 
   @JsonPropertyDescription("Upsert mode.")
-  private final Mode _mode;
+  private Mode _mode;
 
   @JsonPropertyDescription("Function to hash the primary key.")
-  private final HashFunction _hashFunction;
+  private HashFunction _hashFunction = HashFunction.NONE;
 
   @JsonPropertyDescription("Partial update strategies.")
-  private final Map<String, Strategy> _partialUpsertStrategies;
+  private Map<String, Strategy> _partialUpsertStrategies;
 
   @JsonPropertyDescription("default upsert strategy for partial mode")
-  private final Strategy _defaultPartialUpsertStrategy;
+  private Strategy _defaultPartialUpsertStrategy;
 
   @JsonPropertyDescription("Column for upsert comparison, default to time 
column")
-  private final String _comparisonColumn;
+  private String _comparisonColumn;
 
-  @JsonCreator
+  @Deprecated
   public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
       @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> 
partialUpsertStrategies,
       @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy 
defaultPartialUpsertStrategy,
@@ -77,6 +76,14 @@ public class UpsertConfig extends BaseJsonConfig {
     _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
   }
 
+  public UpsertConfig(Mode mode) {
+    _mode = mode;
+  }
+
+  // Do not use this constructor. This is needed for JSON deserialization.
+  public UpsertConfig() {
+  }
+
   public Mode getMode() {
     return _mode;
   }
@@ -97,4 +104,36 @@ public class UpsertConfig extends BaseJsonConfig {
   public String getComparisonColumn() {
     return _comparisonColumn;
   }
+
+  public void setHashFunction(HashFunction hashFunction) {
+    _hashFunction = hashFunction;
+  }
+
+  /**
+   * PartialUpsertStrategies maintains the mapping of merge strategies per 
column.
+   * Each key in the map is a columnName, value is a partial upsert merging 
strategy.
+   * Supported strategies are {OVERWRITE|INCREMENT|APPEND|UNION|IGNORE}.
+   */
+  public void setPartialUpsertStrategies(Map<String, Strategy> 
partialUpsertStrategies) {
+    _partialUpsertStrategies = partialUpsertStrategies;
+  }
+
+  /**
+   * If strategy is not specified for a column, the merger on that column will 
be "defaultPartialUpsertStrategy".
+   * The default value of defaultPartialUpsertStrategy is OVERWRITE.
+   */
+  public void setDefaultPartialUpsertStrategy(Strategy 
defaultPartialUpsertStrategy) {
+    _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy;
+  }
+
+  /**
+   * By default, Pinot uses the value in the time column to determine the 
latest record. For two records with the
+   * same primary key, the record with the larger value of the time column is 
picked as the
+   * latest update.
+   * However, there are cases when users need to use another column to 
determine the order.
+   * In such case, you can use option comparisonColumn to override the column 
used for comparison.
+   */
+  public void setComparisonColumn(String comparisonColumn) {
+    _comparisonColumn = comparisonColumn;
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 37f96eb321..df5875fe33 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,21 +29,20 @@ public class UpsertConfigTest {
 
   @Test
   public void testUpsertConfig() {
-    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, 
null, null, null, null);
+    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL);
     assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
 
-    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
"comparison", null);
+    upsertConfig1.setComparisonColumn("comparison");
     assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
 
-    upsertConfig1 =
-        new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", 
HashFunction.MURMUR3);
+    upsertConfig1.setHashFunction(HashFunction.MURMUR3);
     assertEquals(upsertConfig1.getHashFunction(), HashFunction.MURMUR3);
 
+    UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
     Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
     partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
-    UpsertConfig upsertConfig2 =
-        new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, 
UpsertConfig.Strategy.OVERWRITE, null,
-            null);
+    upsertConfig2.setPartialUpsertStrategies(partialUpsertStratgies);
+    
upsertConfig2.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
     assertEquals(upsertConfig2.getPartialUpsertStrategies(), 
partialUpsertStratgies);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to