Jackie-Jiang commented on a change in pull request #6149:
URL: https://github.com/apache/incubator-pinot/pull/6149#discussion_r505814806



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
     }
   }
 
+  /**
+   * Validates the upsert-related configurations
+   *
+   */
+  private static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
+      return;
+    }
+    // primary key exists
+    if (schema.getPrimaryKeyColumns() == null || 
schema.getPrimaryKeyColumns().isEmpty()) {
+      throw new IllegalStateException("Upsert table must have primary key 
columns in the schema.");
+    }

Review comment:
       ```suggestion
       
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
 "Upsert table must have primary key columns in the schema");
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
     }
   }
 
+  /**
+   * Validates the upsert-related configurations
+   *
+   */
+  private static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
+      return;
+    }
+    // primary key exists
+    if (schema.getPrimaryKeyColumns() == null || 
schema.getPrimaryKeyColumns().isEmpty()) {
+      throw new IllegalStateException("Upsert table must have primary key 
columns in the schema.");
+    }
+    // replica group is configured for routing
+    if (tableConfig.getRoutingConfig() == null || 
!tableConfig.getRoutingConfig().getInstanceSelectorType()
+        .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) 
{
+      throw new IllegalStateException("Upsert table must use replicaGroup as 
the routing config.");
+    }
+    // consumer type must be low-level
+    if (tableConfig.getIndexingConfig() != null && 
tableConfig.getIndexingConfig().getStreamConfigs() != null) {
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs());
+      if (streamConfig.getConsumerTypes().size() != 1
+          || streamConfig.getConsumerTypes().get(0) != 
StreamConfig.ConsumerType.LOWLEVEL) {
+        throw new IllegalStateException("Upsert table must use low-level 
streaming consumer type.");
+      }

Review comment:
       ```suggestion
         Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType(), ""Upsert table must use low-level 
streaming consumer type");
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
     }
   }
 
+  /**
+   * Validates the upsert-related configurations
+   *

Review comment:
       Please add some javadoc on the checks performed here

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
     }
   }
 
+  /**
+   * Validates the upsert-related configurations
+   *
+   */
+  private static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
+      return;
+    }
+    // primary key exists
+    if (schema.getPrimaryKeyColumns() == null || 
schema.getPrimaryKeyColumns().isEmpty()) {
+      throw new IllegalStateException("Upsert table must have primary key 
columns in the schema.");
+    }
+    // replica group is configured for routing
+    if (tableConfig.getRoutingConfig() == null || 
!tableConfig.getRoutingConfig().getInstanceSelectorType()
+        .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) 
{
+      throw new IllegalStateException("Upsert table must use replicaGroup as 
the routing config.");
+    }

Review comment:
       This can potentially have NPE
   ```suggestion
       Preconditions.checkState(tableConfig.getRoutingConfig() != null && 
RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE. 
equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()), 
"Upsert table must use replica-group based routing")
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
##########
@@ -230,6 +234,38 @@ private static void validateIngestionConfig(@Nullable 
IngestionConfig ingestionC
     }
   }
 
+  /**
+   * Validates the upsert-related configurations
+   *
+   */
+  private static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
+      return;
+    }
+    // primary key exists
+    if (schema.getPrimaryKeyColumns() == null || 
schema.getPrimaryKeyColumns().isEmpty()) {
+      throw new IllegalStateException("Upsert table must have primary key 
columns in the schema.");
+    }
+    // replica group is configured for routing
+    if (tableConfig.getRoutingConfig() == null || 
!tableConfig.getRoutingConfig().getInstanceSelectorType()
+        .equalsIgnoreCase(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) 
{
+      throw new IllegalStateException("Upsert table must use replicaGroup as 
the routing config.");
+    }
+    // consumer type must be low-level
+    if (tableConfig.getIndexingConfig() != null && 
tableConfig.getIndexingConfig().getStreamConfigs() != null) {
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), 
tableConfig.getIndexingConfig().getStreamConfigs());
+      if (streamConfig.getConsumerTypes().size() != 1
+          || streamConfig.getConsumerTypes().get(0) != 
StreamConfig.ConsumerType.LOWLEVEL) {
+        throw new IllegalStateException("Upsert table must use low-level 
streaming consumer type.");
+      }
+    }
+    // check type type is realtime
+    if (tableConfig.getTableType() != TableType.REALTIME) {

Review comment:
       Use precondition, also recommend reorder the checks in this order: 
upsert mode -> primary key -> table type -> consumer type -> routing type, 
which IMO is more intuitive




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to