Jackie-Jiang commented on code in PR #17645:
URL: https://github.com/apache/pinot/pull/17645#discussion_r2784602365


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java:
##########
@@ -483,6 +483,9 @@ private SuccessResponse updateSchema(String schemaName, 
Schema schema, boolean r
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SCHEMA_UPLOAD_ERROR,
 1L);
       throw new ControllerApplicationException(LOGGER, String.format("Failed 
to find table %s to reload", schemaName),
           Response.Status.NOT_FOUND, e);
+    } catch (IllegalArgumentException e) {

Review Comment:
   This should be modeled as `SchemaBackwardIncompatibleException`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1631,6 +1635,42 @@ private void updateSchema(Schema schema, Schema 
oldSchema, boolean forceTableSch
     LOGGER.info("Updated schema: {}", schemaName);
   }
 
+  /**
+   * Validates that primary key columns are not changed for upsert tables.
+   * Changing primary key columns can lead to data inconsistencies between 
replicas.
+   *
+   * @param schemaName the name of the schema
+   * @param oldSchema the existing schema
+   * @param newSchema the new schema being applied
+   * @throws IllegalArgumentException if primary key columns are changed and 
forceUpdate is false
+   */
+  private void validatePrimaryKeyColumnsUpdate(String schemaName, Schema 
oldSchema, Schema newSchema) {

Review Comment:
   We can simplify the logic to just not allowing primary key columns change. 
Primary key columns are used in dimension table, upsert and dedup. All of them 
requires special handling



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1570,6 +1572,8 @@ private void updateSchema(Schema schema, Schema 
oldSchema, boolean forceTableSch
       LOGGER.info("New schema: {} is the same as the existing schema, not 
updating it", schemaName);
       return;
     }
+
+    validatePrimaryKeyColumnsUpdate(schemaName, oldSchema, schema);

Review Comment:
   Integrate this into `Schema.isBackwardCompatibleWith()`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1173,6 +1174,156 @@ static void validatePartialUpsertStrategies(TableConfig 
tableConfig, Schema sche
     }
   }
 
+  /**
+   * Validates that critical upsert configuration fields are not changed 
during table config update.
+   * Checks: mode, hashFunction, comparisonColumns, timeColumn (when no 
comparison columns),
+   * deleteRecordColumn, dropOutOfOrderRecord, outOfOrderRecordColumn,
+   * partialUpsertStrategies, defaultPartialUpsertStrategy.
+   *
+   * @param existingConfig the existing table config
+   * @param newConfig the new table config being applied
+   * @param force if true, logs a warning instead of throwing an exception
+   * @throws IllegalArgumentException if any critical config field is changed 
and force is false
+   */
+  public static void validateUpsertConfigUpdate(TableConfig existingConfig, 
TableConfig newConfig, boolean force) {

Review Comment:
   Add a method `void validateBackwardCompatibility(TableConfig newConfig, 
TableConfig existingConfig)`.
   `force` flag should be handled at the caller side.
   (minor) To be consistent with schema, put the new config in front of the 
existing config



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1173,6 +1174,156 @@ static void validatePartialUpsertStrategies(TableConfig 
tableConfig, Schema sche
     }
   }
 
+  /**
+   * Validates that critical upsert configuration fields are not changed 
during table config update.
+   * Checks: mode, hashFunction, comparisonColumns, timeColumn (when no 
comparison columns),
+   * deleteRecordColumn, dropOutOfOrderRecord, outOfOrderRecordColumn,
+   * partialUpsertStrategies, defaultPartialUpsertStrategy.
+   *
+   * @param existingConfig the existing table config
+   * @param newConfig the new table config being applied
+   * @param force if true, logs a warning instead of throwing an exception
+   * @throws IllegalArgumentException if any critical config field is changed 
and force is false
+   */
+  public static void validateUpsertConfigUpdate(TableConfig existingConfig, 
TableConfig newConfig, boolean force) {
+    String tableName = existingConfig.getTableName();
+    List<String> violations = new ArrayList<>();
+
+    boolean existingUpsertEnabled = existingConfig.isUpsertEnabled();
+    boolean newUpsertEnabled = newConfig.isUpsertEnabled();
+
+    // Check if upsert is being added or removed
+    if (existingUpsertEnabled != newUpsertEnabled) {
+      if (existingUpsertEnabled) {
+        violations.add("upsertConfig cannot remove from existing upsert 
table");
+      } else {
+        violations.add("upsertConfig cannot add to existing non-upsert table");
+      }
+    } else if (existingUpsertEnabled) {
+      UpsertConfig existingUpsertConfig = existingConfig.getUpsertConfig();
+      UpsertConfig newUpsertConfig = newConfig.getUpsertConfig();
+
+      if (existingUpsertConfig.getMode() != newUpsertConfig.getMode()) {
+        violations.add(
+            String.format("mode (%s -> %s)", existingUpsertConfig.getMode(), 
newUpsertConfig.getMode()));
+      }
+      if (existingUpsertConfig.getHashFunction() != 
newUpsertConfig.getHashFunction()) {
+        violations.add(String.format("hashFunction (%s -> %s)", 
existingUpsertConfig.getHashFunction(),
+            newUpsertConfig.getHashFunction()));
+      }
+      if (!Objects.equals(existingUpsertConfig.getComparisonColumns(),
+          newUpsertConfig.getComparisonColumns())) {
+        violations.add(
+            String.format("comparisonColumns (%s -> %s)", 
existingUpsertConfig.getComparisonColumns(),
+                newUpsertConfig.getComparisonColumns()));
+      }
+      List<String> existingComparisonColumns = 
existingUpsertConfig.getComparisonColumns();
+      if (existingComparisonColumns == null || 
existingComparisonColumns.isEmpty()) {
+        String existingTimeColumn =
+            existingConfig.getValidationConfig() != null ? 
existingConfig.getValidationConfig().getTimeColumnName()
+                : null;
+        String newTimeColumn =
+            newConfig.getValidationConfig() != null ? 
newConfig.getValidationConfig().getTimeColumnName() : null;
+        if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+          violations.add(
+              String.format("timeColumnName (%s -> %s) - used as default 
comparison column", existingTimeColumn,
+                  newTimeColumn));
+        }
+      }
+      if (existingUpsertConfig.isDropOutOfOrderRecord() != 
newUpsertConfig.isDropOutOfOrderRecord()) {
+        violations.add(
+            String.format("dropOutOfOrderRecord (%s -> %s)", 
existingUpsertConfig.isDropOutOfOrderRecord(),
+                newUpsertConfig.isDropOutOfOrderRecord()));
+      }
+      if (!Objects.equals(existingUpsertConfig.getOutOfOrderRecordColumn(),
+          newUpsertConfig.getOutOfOrderRecordColumn())) {
+        violations.add(String.format("outOfOrderRecordColumn (%s -> %s)",
+            existingUpsertConfig.getOutOfOrderRecordColumn(), 
newUpsertConfig.getOutOfOrderRecordColumn()));
+      }
+      if (existingUpsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+        if (!Objects.equals(existingUpsertConfig.getPartialUpsertStrategies(),
+            newUpsertConfig.getPartialUpsertStrategies())) {
+          violations.add(String.format("partialUpsertStrategies (%s -> %s)",
+              existingUpsertConfig.getPartialUpsertStrategies(), 
newUpsertConfig.getPartialUpsertStrategies()));
+        }
+        if (existingUpsertConfig.getDefaultPartialUpsertStrategy()
+            != newUpsertConfig.getDefaultPartialUpsertStrategy()) {
+          violations.add(String.format("defaultPartialUpsertStrategy (%s -> 
%s)",
+              existingUpsertConfig.getDefaultPartialUpsertStrategy(),
+              newUpsertConfig.getDefaultPartialUpsertStrategy()));
+        }
+      }
+    }
+    handleViolations(tableName, violations, force, "upsert");
+  }
+
+  /**
+   * Validates that critical dedup configuration fields are not changed during 
table config update.
+   * Checks: dedupEnabled, hashFunction, dedupTimeColumn, timeColumnName (when 
dedupTimeColumn not specified).
+   *
+   * @param existingConfig the existing table config
+   * @param newConfig the new table config being applied
+   * @param force if true, logs a warning instead of throwing an exception
+   * @throws IllegalArgumentException if any critical config field is changed 
and force is false
+   */
+  public static void validateDedupConfigUpdate(TableConfig existingConfig, 
TableConfig newConfig, boolean force) {
+    String tableName = existingConfig.getTableName();
+    List<String> violations = new ArrayList<>();
+
+    boolean existingDedupEnabled = existingConfig.isDedupEnabled();
+    boolean newDedupEnabled = newConfig.isDedupEnabled();
+    if (existingDedupEnabled != newDedupEnabled) {
+      if (existingDedupEnabled) {
+        violations.add("dedupConfig cannot remove from existing dedup table");
+      } else {
+        violations.add("dedupConfig cannot add to existing non-dedup table");
+      }
+    } else if (existingDedupEnabled) {
+      DedupConfig existingDedupConfig = existingConfig.getDedupConfig();
+      DedupConfig newDedupConfig = newConfig.getDedupConfig();
+
+      if (existingDedupConfig.getHashFunction() != 
newDedupConfig.getHashFunction()) {
+        violations.add(String.format("hashFunction (%s -> %s)", 
existingDedupConfig.getHashFunction(),
+            newDedupConfig.getHashFunction()));
+      }
+
+      if (!Objects.equals(existingDedupConfig.getDedupTimeColumn(), 
newDedupConfig.getDedupTimeColumn())) {
+        violations.add(String.format("dedupTimeColumn (%s -> %s)", 
existingDedupConfig.getDedupTimeColumn(),
+            newDedupConfig.getDedupTimeColumn()));
+      }
+      String existingDedupTimeColumn = 
existingDedupConfig.getDedupTimeColumn();
+      if (existingDedupTimeColumn == null || 
existingDedupTimeColumn.isEmpty()) {
+        String existingTimeColumn =
+            existingConfig.getValidationConfig() != null ? 
existingConfig.getValidationConfig().getTimeColumnName()
+                : null;
+        String newTimeColumn =
+            newConfig.getValidationConfig() != null ? 
newConfig.getValidationConfig().getTimeColumnName() : null;
+        if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+          violations.add(
+              String.format("timeColumnName (%s -> %s) - used as default dedup 
time column", existingTimeColumn,
+                  newTimeColumn));
+        }
+      }
+    }
+    handleViolations(tableName, violations, force, "dedup");
+  }
+
+  private static void handleViolations(String tableName, List<String> 
violations, boolean force, String configType) {
+    if (!violations.isEmpty()) {
+      if (force) {
+        LOGGER.warn("Forcing a config: {} update for table {} with changes: 
{}."
+                + "This may cause data inconsistencies or data loss. Be 
cautious during compactions, and "
+                + "pause consumption beforehand and disable SNAPSHOT mode in 
upsertConfig and restart for the changes"
+                + " to kick in. If in doubt, recreate the table with the new 
configuration.", configType, tableName,
+            violations);
+      } else {
+        throw new IllegalArgumentException(String.format(

Review Comment:
   Let's introduce an exception `TableConfigBackwardIncompatibleException`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -737,7 +737,10 @@ private void validateLogicalTableReference(String 
tableName, TableType tableType
   public ConfigSuccessResponse updateTableConfig(
       @ApiParam(value = "Name of the table to update", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "comma separated list of validation type(s) to skip. 
supported types: (ALL|TASK|UPSERT)")
-      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, 
@Context HttpHeaders headers,
+      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+      @ApiParam(value = "Force config changes")
+      @QueryParam("forceConfigUpdate") @DefaultValue("false") boolean 
forceConfigUpdate,

Review Comment:
   Make it consistent with schema update
   ```suggestion
         @QueryParam("force") @DefaultValue("false") boolean force,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to