This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4633cf58866 Add checks to not update upsert / dedup configs after
table creation (#17645)
4633cf58866 is described below
commit 4633cf58866901a29887ec7004e912f377d12d60
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Wed Mar 4 12:36:23 2026 -0800
Add checks to not update upsert / dedup configs after table creation
(#17645)
---
.../TableConfigBackwardIncompatibleException.java | 26 ++++
.../api/resources/PinotTableRestletResource.java | 12 +-
.../api/resources/TableConfigsRestletResource.java | 13 +-
.../helix/core/PinotHelixResourceManager.java | 77 ++++++++++-
.../RealtimeOffsetAutoResetKafkaHandler.java | 14 +-
.../helix/LogicalTableMetadataCacheTest.java | 2 +-
.../segment/local/utils/TableConfigUtils.java | 143 +++++++++++++++++++++
.../java/org/apache/pinot/spi/data/Schema.java | 12 ++
8 files changed, 280 insertions(+), 19 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
new file mode 100644
index 00000000000..21125a3b0f7
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.exception;
+
+public class TableConfigBackwardIncompatibleException extends Exception {
+
+ public TableConfigBackwardIncompatibleException(String message) {
+ super(message);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 9d934c4d184..498f0349beb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -79,6 +79,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.RebalanceInProgressException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
+import
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -727,7 +728,10 @@ public class PinotTableRestletResource {
public ConfigSuccessResponse updateTableConfig(
@ApiParam(value = "Name of the table to update", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "comma separated list of validation type(s) to skip.
supported types: (ALL|TASK|UPSERT)")
- @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
@Context HttpHeaders headers,
+ @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+ @ApiParam(value = "Force config changes")
+ @QueryParam("force") @DefaultValue("false") boolean force,
+ @Context HttpHeaders headers,
String tableConfigString)
throws Exception {
Pair<TableConfig, Map<String, Object>>
tableConfigAndUnrecognizedProperties;
@@ -762,7 +766,11 @@ public class PinotTableRestletResource {
throw new ControllerApplicationException(LOGGER, "Table " +
tableNameWithType + " does not exist",
Response.Status.NOT_FOUND);
}
- _pinotHelixResourceManager.updateTableConfig(tableConfig);
+ _pinotHelixResourceManager.updateTableConfig(tableConfig, force);
+ } catch (TableConfigBackwardIncompatibleException e) {
+ String errStr = String.format("Failed to update configuration for %s due
to: %s", tableName, e.getMessage());
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
1L);
+ throw new ControllerApplicationException(LOGGER, errStr,
Response.Status.BAD_REQUEST, e);
} catch (InvalidTableConfigException e) {
String errStr = String.format("Failed to update configuration for %s due
to: %s", tableName, e.getMessage());
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
1L);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index 72bb48a1878..744d5a1dd22 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -357,8 +358,8 @@ public class TableConfigsRestletResource {
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.UPDATE_TABLE_CONFIGS)
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Update the TableConfigs provided by the
tableConfigsStr json",
- notes = "Update the TableConfigs provided by the tableConfigsStr json")
+ @ApiOperation(value = "Update the TableConfigs provided by the
tableConfigsStr json", notes = "Update the "
+ + "TableConfigs provided by the tableConfigsStr json")
public ConfigSuccessResponse updateConfig(
@ApiParam(value = "TableConfigs name i.e. raw table name", required =
true) @PathParam("tableName")
String tableName,
@@ -405,7 +406,7 @@ public class TableConfigsRestletResource {
if (offlineTableConfig != null) {
tuneConfig(offlineTableConfig, schema);
if (_pinotHelixResourceManager.hasOfflineTable(tableName)) {
- _pinotHelixResourceManager.updateTableConfig(offlineTableConfig);
+ _pinotHelixResourceManager.updateTableConfig(offlineTableConfig,
forceTableSchemaUpdate);
LOGGER.info("Updated offline table config: {}", tableName);
} else {
_pinotHelixResourceManager.addTable(offlineTableConfig);
@@ -415,13 +416,17 @@ public class TableConfigsRestletResource {
if (realtimeTableConfig != null) {
tuneConfig(realtimeTableConfig, schema);
if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) {
- _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig);
+ _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig,
forceTableSchemaUpdate);
LOGGER.info("Updated realtime table config: {}", tableName);
} else {
_pinotHelixResourceManager.addTable(realtimeTableConfig);
LOGGER.info("Created realtime table config: {}", tableName);
}
}
+ } catch (TableConfigBackwardIncompatibleException e) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
1L);
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Invalid TableConfigs for: %s, %s", tableName,
e.getMessage()), Response.Status.BAD_REQUEST, e);
} catch (InvalidTableConfigException e) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
1L);
throw new ControllerApplicationException(LOGGER,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d0f204087b0..8ae187ab3a6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -95,6 +96,7 @@ import
org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
+import
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
@@ -155,6 +157,7 @@ import
org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.workload.QueryWorkloadManager;
import org.apache.pinot.core.util.NumberUtils;
import org.apache.pinot.core.util.NumericException;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
@@ -1571,6 +1574,7 @@ public class PinotHelixResourceManager {
LOGGER.info("New schema: {} is the same as the existing schema, not
updating it", schemaName);
return;
}
+
boolean isBackwardCompatible = schema.isBackwardCompatibleWith(oldSchema);
if (!isBackwardCompatible) {
if (forceTableSchemaUpdate) {
@@ -1582,6 +1586,17 @@ public class PinotHelixResourceManager {
.append(" is not backward-compatible with the existing schema.");
errorMsg.append("\n\nIncompatibility Details:");
+ // Check for primary key column changes
+ // Allow adding primary keys if not present. Helps add upsert and
dedup configs to existing tables.
+ List<String> oldPrimaryKeys = oldSchema.getPrimaryKeyColumns();
+ List<String> newPrimaryKeys = schema.getPrimaryKeyColumns();
+ if (CollectionUtils.isNotEmpty(oldPrimaryKeys)) {
+ if (!Objects.equals(oldPrimaryKeys, newPrimaryKeys)) {
+ errorMsg.append("\n- Primary key columns changed
(").append(oldPrimaryKeys).append(" -> ")
+ .append(newPrimaryKeys).append(")");
+ }
+ }
+
// Check for missing columns
Set<String> newSchemaColumns = schema.getColumnNames();
List<String> missingColumns = new ArrayList<>();
@@ -1621,9 +1636,10 @@ public class PinotHelixResourceManager {
errorMsg.append("\n\nSuggestions to fix:");
errorMsg.append("\n1. Ensure all columns from the existing schema are
retained in the new schema");
errorMsg.append("\n2. Do not change the data type or field type of
existing columns");
- errorMsg.append("\n3. New columns should be added as optional fields
with default values");
- errorMsg.append("\n4. If you must make breaking changes, consider
creating a new schema version or use "
- + "forceTableSchemaUpdate=true (use with caution)");
+ errorMsg.append("\n3. Do not change primary key columns");
+ errorMsg.append("\n4. New columns should be added as optional fields
with default values");
+ errorMsg.append("\n5. If you must make breaking changes, consider
creating a new schema version or use "
+ + "force=true (use with caution)");
throw new SchemaBackwardIncompatibleException(errorMsg.toString());
}
@@ -2145,12 +2161,25 @@ public class PinotHelixResourceManager {
/**
* Validate the table config and update it
* @throws IOException
+ * @throws TableConfigBackwardIncompatibleException if config changes are
backward incompatible
*/
public void updateTableConfig(TableConfig tableConfig)
- throws IOException {
+ throws IOException, TableConfigBackwardIncompatibleException {
+ updateTableConfig(tableConfig, false);
+ }
+
+ /**
+ * Validate the table config and update it
+ * @param tableConfig the table config to update
+ * @param force if true, allows upsert/dedup config changes with a warning
+ * @throws IOException
+ * @throws TableConfigBackwardIncompatibleException if config changes are
backward incompatible and force is false
+ */
+ public void updateTableConfig(TableConfig tableConfig, boolean force)
+ throws IOException, TableConfigBackwardIncompatibleException {
validateTableTenantConfig(tableConfig);
validateTableTaskMinionInstanceTagConfig(tableConfig);
- setExistingTableConfig(tableConfig);
+ setExistingTableConfig(tableConfig, -1, force);
}
/**
@@ -2158,7 +2187,7 @@ public class PinotHelixResourceManager {
* TODO - Make this private and always use updateTableConfig ?
*/
public void setExistingTableConfig(TableConfig tableConfig)
- throws IOException {
+ throws IOException, TableConfigBackwardIncompatibleException {
setExistingTableConfig(tableConfig, -1);
}
@@ -2259,9 +2288,43 @@ public class PinotHelixResourceManager {
/**
* Sets the given table config into zookeeper with the expected version,
which is the previous tableConfig znRecord
* version. If the expected version is -1, the version check is ignored.
+ *
+ * @throws TableConfigBackwardIncompatibleException if config changes are
backward incompatible
+ */
+ public void setExistingTableConfig(TableConfig tableConfig, int
expectedVersion)
+ throws TableConfigBackwardIncompatibleException {
+ setExistingTableConfig(tableConfig, expectedVersion, false);
+ }
+
+ /**
+ * Sets the given table config into zookeeper with the expected version.
+ *
+ * @param tableConfig the table config to set
+ * @param expectedVersion the expected version (-1 to ignore version check)
+ * @param force if true, allows upsert/dedup config changes with a warning
+ * @throws TableConfigBackwardIncompatibleException if config changes are
backward incompatible and force is false
*/
- public void setExistingTableConfig(TableConfig tableConfig, int
expectedVersion) {
+ public void setExistingTableConfig(TableConfig tableConfig, int
expectedVersion, boolean force)
+ throws TableConfigBackwardIncompatibleException {
String tableNameWithType = tableConfig.getTableName();
+ TableConfig existingTableConfig = getTableConfig(tableNameWithType);
+ if (existingTableConfig != null) {
+ List<String> violations =
TableConfigUtils.validateBackwardCompatibility(tableConfig,
existingTableConfig);
+ if (!violations.isEmpty()) {
+ String tableName = tableConfig.getTableName();
+ if (force) {
+ LOGGER.warn("Forcing a config update for table: {} with violations:
{}."
+ + "This may cause data inconsistencies or data loss. Be cautious
during compactions, and "
+ + "pause consumption beforehand and disable SNAPSHOT mode in
upsertConfig and restart for the changes"
+ + " to kick in. If in doubt, recreate the table with the new
configuration.", tableName, violations);
+ } else {
+ throw new TableConfigBackwardIncompatibleException(String.format(
+ "Failed to update table '%s': Cannot modify %s as it may lead to
data inconsistencies. "
+ + "Please create a new table instead.", tableName,
violations));
+ }
+ }
+ }
+
if (!ZKMetadataProvider.setTableConfig(_propertyStore, tableConfig,
expectedVersion)) {
throw new RuntimeException(
"Failed to update table config in Zookeeper for table: " +
tableNameWithType + " with" + " expected version: "
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
index 363455b0190..c17d9c6152e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -70,12 +71,12 @@ public abstract class RealtimeOffsetAutoResetKafkaHandler
implements RealtimeOff
* @return true if successfully started the backfill job and its ingestion
*/
@Override
- public boolean triggerBackfillJob(
- String tableNameWithType, StreamConfig streamConfig, String topicName,
int partitionId, long fromOffset,
- long toOffset) {
+ public boolean triggerBackfillJob(String tableNameWithType, StreamConfig
streamConfig, String topicName,
+ int partitionId, long fromOffset, long toOffset) {
// Trigger the data replication and get the new topic's stream config.
- Map<String, String> newTopicStreamConfig =
triggerDataReplicationAndGetTopicInfo(
- tableNameWithType, streamConfig, topicName, partitionId, fromOffset,
toOffset);
+ Map<String, String> newTopicStreamConfig =
+ triggerDataReplicationAndGetTopicInfo(tableNameWithType, streamConfig,
topicName, partitionId, fromOffset,
+ toOffset);
if (newTopicStreamConfig == null) {
return false;
}
@@ -88,6 +89,9 @@ public abstract class RealtimeOffsetAutoResetKafkaHandler
implements RealtimeOff
} catch (IOException e) {
LOGGER.error("Cannot add backfill topic to the table config", e);
return false;
+ } catch (TableConfigBackwardIncompatibleException e) {
+ LOGGER.error("Cannot change backfill job to the table config", e);
+ return false;
}
return true;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
index 6b8434d3ed3..2fcc5a35a54 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
@@ -105,7 +105,7 @@ public class LogicalTableMetadataCacheTest {
@Test
public void testLogicalTableCacheWithUpdates()
- throws IOException {
+ throws Exception {
String logicalTableName = "testLogicalTable1";
LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
logicalTableName, List.of(_offlineTableConfig.getTableName(),
_realtimeTableConfig.getTableName()));
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a8453a1c3bf..0947c5f6938 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -1183,6 +1184,148 @@ public final class TableConfigUtils {
}
}
+ /**
+ * Validates backward compatibility for table config updates.
+ * Checks critical upsert and dedup configuration fields that should not be
changed.
+ *
+ * @param newConfig the new table config being applied
+ * @param existingConfig the existing table config
+ * @return list of violations (empty if no violations)
+ */
+ public static List<String> validateBackwardCompatibility(TableConfig
newConfig, TableConfig existingConfig) {
+ List<String> violations = new ArrayList<>();
+ validateUpsertConfigUpdate(newConfig, existingConfig, violations);
+ validateDedupConfigUpdate(newConfig, existingConfig, violations);
+
+ return violations;
+ }
+
+ /**
+ * Validates that critical upsert configuration fields are not changed
during table config update.
+ * Checks: mode, hashFunction, comparisonColumns, timeColumn (when no
comparison columns),
+ * deleteRecordColumn, dropOutOfOrderRecord, outOfOrderRecordColumn,
+ * partialUpsertStrategies, defaultPartialUpsertStrategy.
+ *
+ * @param newConfig the new table config being applied
+ * @param existingConfig the existing table config
+ * @param violations list to collect violation messages
+ */
+ private static void validateUpsertConfigUpdate(TableConfig newConfig,
TableConfig existingConfig,
+ List<String> violations) {
+ boolean existingUpsertEnabled = existingConfig.isUpsertEnabled();
+ boolean newUpsertEnabled = newConfig.isUpsertEnabled();
+
+ // Check if upsert is being added or removed
+ if (existingUpsertEnabled != newUpsertEnabled) {
+ if (existingUpsertEnabled) {
+ LOGGER.info("upsertConfig is removed from existing upsert table: {}",
newConfig.getTableName());
+ } else {
+ LOGGER.info("upsertConfig is added to existing non-upsert table: {}",
newConfig.getTableName());
+ }
+ } else if (existingUpsertEnabled) {
+ UpsertConfig existingUpsertConfig = existingConfig.getUpsertConfig();
+ UpsertConfig newUpsertConfig = newConfig.getUpsertConfig();
+
+ if (existingUpsertConfig.getMode() != newUpsertConfig.getMode()) {
+ violations.add(
+ String.format("upsertConfig.mode (%s -> %s)",
existingUpsertConfig.getMode(), newUpsertConfig.getMode()));
+ }
+ if (existingUpsertConfig.getHashFunction() !=
newUpsertConfig.getHashFunction()) {
+ violations.add(String.format("upsertConfig.hashFunction (%s -> %s)",
existingUpsertConfig.getHashFunction(),
+ newUpsertConfig.getHashFunction()));
+ }
+ if (!Objects.equals(existingUpsertConfig.getComparisonColumns(),
+ newUpsertConfig.getComparisonColumns())) {
+ violations.add(
+ String.format("upsertConfig.comparisonColumns (%s -> %s)",
existingUpsertConfig.getComparisonColumns(),
+ newUpsertConfig.getComparisonColumns()));
+ }
+ List<String> existingComparisonColumns =
existingUpsertConfig.getComparisonColumns();
+ if (existingComparisonColumns == null ||
existingComparisonColumns.isEmpty()) {
+ String existingTimeColumn =
+ existingConfig.getValidationConfig() != null ?
existingConfig.getValidationConfig().getTimeColumnName()
+ : null;
+ String newTimeColumn =
+ newConfig.getValidationConfig() != null ?
newConfig.getValidationConfig().getTimeColumnName() : null;
+ if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+ violations.add(
+ String.format("timeColumnName (%s -> %s) - used as default
comparison column", existingTimeColumn,
+ newTimeColumn));
+ }
+ }
+ if (existingUpsertConfig.isDropOutOfOrderRecord() !=
newUpsertConfig.isDropOutOfOrderRecord()) {
+ violations.add(
+ String.format("upsertConfig.dropOutOfOrderRecord (%s -> %s)",
existingUpsertConfig.isDropOutOfOrderRecord(),
+ newUpsertConfig.isDropOutOfOrderRecord()));
+ }
+ if (!Objects.equals(existingUpsertConfig.getOutOfOrderRecordColumn(),
+ newUpsertConfig.getOutOfOrderRecordColumn())) {
+ violations.add(String.format("upsertConfig.outOfOrderRecordColumn (%s
-> %s)",
+ existingUpsertConfig.getOutOfOrderRecordColumn(),
newUpsertConfig.getOutOfOrderRecordColumn()));
+ }
+ if (existingUpsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+ if (!Objects.equals(existingUpsertConfig.getPartialUpsertStrategies(),
+ newUpsertConfig.getPartialUpsertStrategies())) {
+ violations.add(String.format("upsertConfig.partialUpsertStrategies
(%s -> %s)",
+ existingUpsertConfig.getPartialUpsertStrategies(),
newUpsertConfig.getPartialUpsertStrategies()));
+ }
+ if (existingUpsertConfig.getDefaultPartialUpsertStrategy()
+ != newUpsertConfig.getDefaultPartialUpsertStrategy()) {
+
violations.add(String.format("upsertConfig.defaultPartialUpsertStrategy (%s ->
%s)",
+ existingUpsertConfig.getDefaultPartialUpsertStrategy(),
+ newUpsertConfig.getDefaultPartialUpsertStrategy()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Validates that critical dedup configuration fields are not changed during
table config update.
+ * Checks: dedupEnabled, hashFunction, dedupTimeColumn, timeColumnName (when
dedupTimeColumn not specified).
+ *
+ * @param newConfig the new table config being applied
+ * @param existingConfig the existing table config
+ * @param violations list to collect violation messages
+ */
+ private static void validateDedupConfigUpdate(TableConfig newConfig,
TableConfig existingConfig,
+ List<String> violations) {
+ boolean existingDedupEnabled = existingConfig.isDedupEnabled();
+ boolean newDedupEnabled = newConfig.isDedupEnabled();
+ if (existingDedupEnabled != newDedupEnabled) {
+ if (existingDedupEnabled) {
+ LOGGER.info("dedupConfig is removed from existing dedup table: {}",
newConfig.getTableName());
+ } else {
+ LOGGER.info("dedupConfig is added into the existing non-dedup table:
{}", newConfig.getTableName());
+ }
+ } else if (existingDedupEnabled) {
+ DedupConfig existingDedupConfig = existingConfig.getDedupConfig();
+ DedupConfig newDedupConfig = newConfig.getDedupConfig();
+
+ if (existingDedupConfig.getHashFunction() !=
newDedupConfig.getHashFunction()) {
+ violations.add(String.format("dedupConfig.hashFunction (%s -> %s)",
existingDedupConfig.getHashFunction(),
+ newDedupConfig.getHashFunction()));
+ }
+
+ if (!Objects.equals(existingDedupConfig.getDedupTimeColumn(),
newDedupConfig.getDedupTimeColumn())) {
+ violations.add(String.format("dedupConfig.dedupTimeColumn (%s -> %s)",
existingDedupConfig.getDedupTimeColumn(),
+ newDedupConfig.getDedupTimeColumn()));
+ }
+ String existingDedupTimeColumn =
existingDedupConfig.getDedupTimeColumn();
+ if (existingDedupTimeColumn == null ||
existingDedupTimeColumn.isEmpty()) {
+ String existingTimeColumn =
+ existingConfig.getValidationConfig() != null ?
existingConfig.getValidationConfig().getTimeColumnName()
+ : null;
+ String newTimeColumn =
+ newConfig.getValidationConfig() != null ?
newConfig.getValidationConfig().getTimeColumnName() : null;
+ if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+ violations.add(
+ String.format("timeColumnName (%s -> %s) - used as default dedup
time column", existingTimeColumn,
+ newTimeColumn));
+ }
+ }
+ }
+ }
+
/**
* Validates task configuration to ensure no conflicting task types are
configured.
*/
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 57e2f9f27ca..a21961357c2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
@@ -840,10 +842,20 @@ public final class Schema implements Serializable {
* Backward compatibility requires
* (1) all columns in oldSchema should be retained.
* (2) all column fieldSpecs should be backward compatible with the old ones.
+ * (3) primary key columns should not be changed if present(used in
dimension tables, upsert, and dedup).
*
* @param oldSchema old schema
*/
public boolean isBackwardCompatibleWith(Schema oldSchema) {
+ List<String> oldPrimaryKeys = oldSchema.getPrimaryKeyColumns();
+ List<String> newPrimaryKeys = getPrimaryKeyColumns();
+ // Allow adding primary keys if not present. Helps add upsert and dedup
configs to existing tables.
+ if (CollectionUtils.isNotEmpty(oldPrimaryKeys)) {
+ if (!Objects.equals(oldPrimaryKeys, newPrimaryKeys)) {
+ return false;
+ }
+ }
+
Set<String> columnNames = getColumnNames();
for (Map.Entry<String, FieldSpec> entry :
oldSchema.getFieldSpecMap().entrySet()) {
String oldSchemaColumnName = entry.getKey();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]