This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 51698e9b29 [Clean up] Simplify UpsertConfig (#9150)
51698e9b29 is described below
commit 51698e9b298931dc21a91c80135b436b41215ab0
Author: deemoliu <[email protected]>
AuthorDate: Wed Aug 3 17:29:22 2022 -0700
[Clean up] Simplify UpsertConfig (#9150)
---
.../common/utils/config/TableConfigSerDeTest.java | 4 +-
.../controller/helix/PinotResourceManagerTest.java | 2 +-
.../tests/BaseClusterIntegrationTest.java | 2 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 11 ++--
.../mutable/MutableSegmentImplUpsertTest.java | 10 ++--
.../segment/local/utils/TableConfigUtilsTest.java | 63 ++++++++++------------
.../pinot/spi/config/table/UpsertConfig.java | 53 +++++++++++++++---
.../pinot/spi/config/table/UpsertConfigTest.java | 13 +++--
8 files changed, 97 insertions(+), 61 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 2b3e01e5b2..a4e15c4d22 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -250,9 +250,7 @@ public class TableConfigSerDeTest {
}
{
// with upsert config
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL,
null, null, "comparison", HashFunction.NONE);
-
- TableConfig tableConfig =
tableConfigBuilder.setUpsertConfig(upsertConfig).build();
+ TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).build();
// Serialize then de-serialize
checkTableConfigWithUpsertConfig(JsonUtils.stringToObject(tableConfig.toJsonString(),
TableConfig.class));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 204a2ff7dd..6e4230c39a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
realtimeTableConfig.getValidationConfig()
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- realtimeTableConfig.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL, null, null, null, null));
+ realtimeTableConfig.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL));
TEST_INSTANCE.getHelixResourceManager().addTable(realtimeTableConfig);
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 8308435a47..21535e5ebe 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -397,7 +397,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
}
/**
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 60e336b7ec..08cbf3e91c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -58,8 +58,11 @@ public class MutableSegmentImplUpsertComparisonColTest {
URL schemaResourceUrl =
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl =
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
- _tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
"offset", null)).build();
+ UpsertConfig offsetUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ offsetUpsertConfig.setComparisonColumn("offset");
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(offsetUpsertConfig)
+ .build();
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
@@ -67,8 +70,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
HashFunction.NONE, null,
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
- Collections.emptySet(), false, true, new
UpsertConfig(UpsertConfig.Mode.FULL, null, null, "offset", null),
- "secondsSinceEpoch", _partitionUpsertMetadataManager, null);
+ Collections.emptySet(), false, true, offsetUpsertConfig,
"secondsSinceEpoch",
+ _partitionUpsertMetadataManager, null);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
_schema.getColumnNames(), null)) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a603303bc8..015ba5705a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -56,8 +56,11 @@ public class MutableSegmentImplUpsertTest {
URL schemaResourceUrl =
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl =
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
- _tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, hashFunction)).build();
+ UpsertConfig upsertConfigWithHash = new
UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfigWithHash.setHashFunction(hashFunction);
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+ .build();
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
@@ -65,8 +68,7 @@ public class MutableSegmentImplUpsertTest {
hashFunction, null,
mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema,
Collections.emptySet(), Collections.emptySet(),
- Collections.emptySet(), false, true,
- new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null,
hashFunction), "secondsSinceEpoch",
+ Collections.emptySet(), false, true, upsertConfigWithHash,
"secondsSinceEpoch",
_partitionUpsertMetadataManager, null);
GenericRow reuse = new GenericRow();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9bca0ff9ce..41ede3df4d 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1227,8 +1227,7 @@ public class TableConfigUtilsTest {
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(new DedupConfig(true, HashFunction.NONE))
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
- .setStreamConfigs(streamConfigs).build();
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Assert.fail();
@@ -1242,8 +1241,9 @@ public class TableConfigUtilsTest {
Schema schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.build();
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null)).build();
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Assert.fail();
@@ -1251,8 +1251,8 @@ public class TableConfigUtilsTest {
Assert.assertEquals(e.getMessage(), "Upsert/Dedup table is for realtime
table only.");
}
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null)).build();
+ tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Assert.fail();
@@ -1272,8 +1272,7 @@ public class TableConfigUtilsTest {
}
Map<String, String> streamConfigs = getStreamConfigs();
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1283,8 +1282,7 @@ public class TableConfigUtilsTest {
}
streamConfigs.put("stream.kafka.consumer.type", "simple");
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1294,8 +1292,7 @@ public class TableConfigUtilsTest {
"Upsert/Dedup table must use strict replica-group (i.e.
strictReplicaGroup) based routing");
}
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1303,8 +1300,7 @@ public class TableConfigUtilsTest {
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Lists.newArrayList("myCol"), null,
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.COUNT,
"myCol").toColumnName()), 10);
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
try {
@@ -1315,8 +1311,7 @@ public class TableConfigUtilsTest {
}
//With Aggregate Metrics
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).build();
try {
@@ -1329,8 +1324,7 @@ public class TableConfigUtilsTest {
//With aggregation Configs in Ingestion Config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("twiceSum", "SUM(twice)")));
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).setIngestionConfig(ingestionConfig).build();
try {
@@ -1341,8 +1335,7 @@ public class TableConfigUtilsTest {
}
//With aggregation Configs in Ingestion Config and IndexingConfig at the
same time
- tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null))
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).setIngestionConfig(ingestionConfig).build();
try {
@@ -1366,12 +1359,15 @@ public class TableConfigUtilsTest {
streamConfigs.put("stream.kafka.consumer.type", "simple");
Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.IGNORE);
- TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(
- new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE,
- "myCol2",
- null)).setNullHandlingEnabled(true)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .setStreamConfigs(streamConfigs).build();
+ UpsertConfig partialUpsertConfig = new
UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
+
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+ partialUpsertConfig.setComparisonColumn("myCol2");
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(partialUpsertConfig)
+ .setNullHandlingEnabled(true)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
Assert.fail();
@@ -1379,10 +1375,11 @@ public class TableConfigUtilsTest {
Assert.assertEquals(e.getMessage(), "Merger cannot be applied to
comparison column");
}
+ partialUpsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
+
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("myCol2")
- .setUpsertConfig(
- new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
- null)).setNullHandlingEnabled(true)
+ .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
try {
@@ -1394,9 +1391,7 @@ public class TableConfigUtilsTest {
partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeCol")
- .setUpsertConfig(
- new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, UpsertConfig.Strategy.OVERWRITE, null,
- null)).setNullHandlingEnabled(false)
+ .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
try {
@@ -1483,8 +1478,8 @@ public class TableConfigUtilsTest {
// invalid Upsert config with RealtimeToOfflineTask
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null, null)).setTaskConfig(
- new
TableTaskConfig(ImmutableMap.of("RealtimeToOfflineSegmentsTask",
realtimeToOfflineTaskConfig,
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setTaskConfig(new TableTaskConfig(
+ ImmutableMap.of("RealtimeToOfflineSegmentsTask",
realtimeToOfflineTaskConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
TableConfigUtils.validateTaskConfigs(tableConfig, schema);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 947e5a86b4..f687be7501 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.spi.config.table;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.google.common.base.Preconditions;
@@ -41,21 +40,21 @@ public class UpsertConfig extends BaseJsonConfig {
}
@JsonPropertyDescription("Upsert mode.")
- private final Mode _mode;
+ private Mode _mode;
@JsonPropertyDescription("Function to hash the primary key.")
- private final HashFunction _hashFunction;
+ private HashFunction _hashFunction = HashFunction.NONE;
@JsonPropertyDescription("Partial update strategies.")
- private final Map<String, Strategy> _partialUpsertStrategies;
+ private Map<String, Strategy> _partialUpsertStrategies;
@JsonPropertyDescription("default upsert strategy for partial mode")
- private final Strategy _defaultPartialUpsertStrategy;
+ private Strategy _defaultPartialUpsertStrategy;
@JsonPropertyDescription("Column for upsert comparison, default to time
column")
- private final String _comparisonColumn;
+ private String _comparisonColumn;
- @JsonCreator
+ @Deprecated
public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
@JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy>
partialUpsertStrategies,
@JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy
defaultPartialUpsertStrategy,
@@ -77,6 +76,14 @@ public class UpsertConfig extends BaseJsonConfig {
_hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
}
+ public UpsertConfig(Mode mode) {
+ _mode = mode;
+ }
+
+ // Do not use this constructor. This is needed for JSON deserialization.
+ public UpsertConfig() {
+ }
+
public Mode getMode() {
return _mode;
}
@@ -97,4 +104,36 @@ public class UpsertConfig extends BaseJsonConfig {
public String getComparisonColumn() {
return _comparisonColumn;
}
+
+ public void setHashFunction(HashFunction hashFunction) {
+ _hashFunction = hashFunction;
+ }
+
+ /**
+ * PartialUpsertStrategies maintains the mapping of merge strategies per
column.
+ * Each key in the map is a columnName, value is a partial upsert merging
strategy.
+ * Supported strategies are {OVERWRITE|INCREMENT|APPEND|UNION|IGNORE}.
+ */
+ public void setPartialUpsertStrategies(Map<String, Strategy>
partialUpsertStrategies) {
+ _partialUpsertStrategies = partialUpsertStrategies;
+ }
+
+ /**
+ * If strategy is not specified for a column, the merger on that column will
be "defaultPartialUpsertStrategy".
+ * The default value of defaultPartialUpsertStrategy is OVERWRITE.
+ */
+ public void setDefaultPartialUpsertStrategy(Strategy
defaultPartialUpsertStrategy) {
+ _defaultPartialUpsertStrategy = defaultPartialUpsertStrategy;
+ }
+
+ /**
+ * By default, Pinot uses the value in the time column to determine the
latest record. For two records with the
+ * same primary key, the record with the larger value of the time column is
picked as the
+ * latest update.
+ * However, there are cases when users need to use another column to
determine the order.
+ * In such case, you can use option comparisonColumn to override the column
used for comparison.
+ */
+ public void setComparisonColumn(String comparisonColumn) {
+ _comparisonColumn = comparisonColumn;
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index 37f96eb321..df5875fe33 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,21 +29,20 @@ public class UpsertConfigTest {
@Test
public void testUpsertConfig() {
- UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL,
null, null, null, null);
+ UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL);
assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
- upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
"comparison", null);
+ upsertConfig1.setComparisonColumn("comparison");
assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
- upsertConfig1 =
- new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison",
HashFunction.MURMUR3);
+ upsertConfig1.setHashFunction(HashFunction.MURMUR3);
assertEquals(upsertConfig1.getHashFunction(), HashFunction.MURMUR3);
+ UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
- UpsertConfig upsertConfig2 =
- new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies,
UpsertConfig.Strategy.OVERWRITE, null,
- null);
+ upsertConfig2.setPartialUpsertStrategies(partialUpsertStratgies);
+
upsertConfig2.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
assertEquals(upsertConfig2.getPartialUpsertStrategies(),
partialUpsertStratgies);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]