Jackie-Jiang commented on a change in pull request #6149: URL: https://github.com/apache/incubator-pinot/pull/6149#discussion_r505814806
########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC } } + /** + * Validates the upsert-related configurations + * + */ + private static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + return; + } + // primary key exists + if (schema.getPrimaryKeyColumns() == null || schema.getPrimaryKeyColumns().isEmpty()) { + throw new IllegalStateException("Upsert table must have primary key columns in the schema."); + } Review comment: ```suggestion Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()), "Upsert table must have primary key columns in the schema"); ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC } } + /** + * Validates the upsert-related configurations + * + */ + private static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + return; + } + // primary key exists + if (schema.getPrimaryKeyColumns() == null || schema.getPrimaryKeyColumns().isEmpty()) { + throw new IllegalStateException("Upsert table must have primary key columns in the schema."); + } + // replica group is configured for routing + if (tableConfig.getRoutingConfig() == null || !tableConfig.getRoutingConfig().getInstanceSelectorType() + .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) { + throw new IllegalStateException("Upsert table must use replicaGroup as the routing config."); + } + // consumer type must be low-level + if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { + StreamConfig streamConfig = + new StreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs()); + if (streamConfig.getConsumerTypes().size() != 1 + || streamConfig.getConsumerTypes().get(0) != StreamConfig.ConsumerType.LOWLEVEL) { + throw new IllegalStateException("Upsert table must use low-level streaming consumer type."); + } Review comment: ```suggestion Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(), ""Upsert table must use low-level streaming consumer type"); ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC } } + /** + * Validates the upsert-related configurations + * Review comment: Please add some javadoc on the checks performed here ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC } } + /** + * Validates the upsert-related configurations + * + */ + private static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + return; + } + // primary key exists + if (schema.getPrimaryKeyColumns() == null || schema.getPrimaryKeyColumns().isEmpty()) { + throw new IllegalStateException("Upsert table must have primary key columns in the schema."); + } + // replica group is configured for routing + if (tableConfig.getRoutingConfig() == null || !tableConfig.getRoutingConfig().getInstanceSelectorType() + .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) { + throw new IllegalStateException("Upsert table must use replicaGroup as the routing config."); + } Review comment: This can potentially have NPE ```suggestion Preconditions.checkState(tableConfig.getRoutingConfig() != null && RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE. equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()), "Upsert table must use replica-group based routing") ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java ########## @@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable IngestionConfig ingestionC } } + /** + * Validates the upsert-related configurations + * + */ + private static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { + if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { + return; + } + // primary key exists + if (schema.getPrimaryKeyColumns() == null || schema.getPrimaryKeyColumns().isEmpty()) { + throw new IllegalStateException("Upsert table must have primary key columns in the schema."); + } + // replica group is configured for routing + if (tableConfig.getRoutingConfig() == null || !tableConfig.getRoutingConfig().getInstanceSelectorType() + .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) { + throw new IllegalStateException("Upsert table must use replicaGroup as the routing config."); + } + // consumer type must be low-level + if (tableConfig.getIndexingConfig() != null && tableConfig.getIndexingConfig().getStreamConfigs() != null) { + StreamConfig streamConfig = + new StreamConfig(tableConfig.getTableName(), tableConfig.getIndexingConfig().getStreamConfigs()); + if (streamConfig.getConsumerTypes().size() != 1 + || streamConfig.getConsumerTypes().get(0) != StreamConfig.ConsumerType.LOWLEVEL) { + throw new IllegalStateException("Upsert table must use low-level streaming consumer type."); + } + } + // check type type is realtime + if (tableConfig.getTableType() != TableType.REALTIME) { Review comment: Use precondition, also recommend reorder the checks in this order: upsert mode -> primary key -> table type -> consumer type -> routing type, which IMO is more intuitive ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org