Jackie-Jiang commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r655586939
##########
File path:
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
Assert.assertEquals(e.getMessage(), "The upsert table cannot have
star-tree index.");
}
}
+
+ @Test
+ public void testValidatePartialUpsertConfig() {
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
FieldSpec.DataType.LONG)
+ .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs
+ .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setStreamConfigs(streamConfigs).build();
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be
enabled for partial upsert tables.");
+ }
+
+ Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
+ partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied
to PK.");
+ }
+
+ partialUpsertStratgies = new HashMap<>();
+ partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric
data types.");
Review comment:
@yupeng9 The wait time is not really a time column, but a dimension
column. Time column must be timestamp or datetime. There is no limit on the
data type, as long as the value is stored as numeric type
##########
File path:
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
Assert.assertEquals(e.getMessage(), "The upsert table cannot have
star-tree index.");
}
}
+
+ @Test
+ public void testValidatePartialUpsertConfig() {
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
FieldSpec.DataType.LONG)
+ .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs
+ .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setStreamConfigs(streamConfigs).build();
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be
enabled for partial upsert tables.");
+ }
+
+ Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
+ partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied
to PK.");
+ }
+
+ partialUpsertStratgies = new HashMap<>();
+ partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric
data types.");
Review comment:
It won't fail, but still we should not try to increment the timestamp
because there is no guarantee that the base value is already ingested (or very
hard to manage at least)
##########
File path:
pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
Assert.assertEquals(e.getMessage(), "The upsert table cannot have
star-tree index.");
}
}
+
+ @Test
+ public void testValidatePartialUpsertConfig() {
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1",
FieldSpec.DataType.LONG)
+ .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs
+ .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+ .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setStreamConfigs(streamConfigs).build();
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be
enabled for partial upsert tables.");
+ }
+
+ Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
+ partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied
to PK.");
+ }
+
+ partialUpsertStratgies = new HashMap<>();
+ partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+ try {
+ TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric
data types.");
Review comment:
Yes. IMO wait time should be long, instead of timestamp (milliseconds
since epoch)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]