Copilot commented on code in PR #17458: URL: https://github.com/apache/pinot/pull/17458#discussion_r2674761721
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java: ########## @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Singleton class to manage the configuration for force commit and reload on consuming segments + * for upsert tables with inconsistent state configurations (partial upsert or dropOutOfOrderRecord=true + * with consistency mode NONE and replication > 1). + * + * This configuration is dynamically updatable via ZK cluster config without requiring a server restart. + */ +public class UpsertInconsistentStateConfig implements PinotClusterConfigChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertInconsistentStateConfig.class); + private static final UpsertInconsistentStateConfig INSTANCE = new UpsertInconsistentStateConfig(); + + private final AtomicBoolean _forceCommitReloadEnabled = + new AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD); + + private UpsertInconsistentStateConfig() { + } + + public static UpsertInconsistentStateConfig getInstance() { + return INSTANCE; + } + + /** + * Checks if force commit/reload is allowed for the given table config. + */ + public boolean isForceCommitReloadAllowed(TableConfig tableConfig) { + return (_forceCommitReloadEnabled.get() || !TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)); + } + + /** + * Returns the current config key used for this setting. + */ + public String getConfigKey() { + return ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG; + } + + @Override + public void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { + if (!changedConfigs.contains(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG)) { + return; + } + + String configValue = clusterConfigs.get(ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG); + boolean enabled = (configValue == null) + ? ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD + : Boolean.parseBoolean(configValue); + + boolean previousValue = _forceCommitReloadEnabled.getAndSet(enabled); + if (previousValue != enabled) { + LOGGER.info("Updated cluster config: {} from {} to {}", ConfigChangeListenerConstants.FORCE_COMMIT_RELOAD_CONFIG, + previousValue, enabled); Review Comment: The variable name `enabled` is misleading. This boolean represents whether force commit/reload is *allowed*, not whether the feature is *enabled*. Consider renaming to `forceCommitReloadAllowed` for clarity. ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java: ########## @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.CommonConstants.ConfigChangeListenerConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Singleton class to manage the configuration for force commit and reload on consuming segments + * for upsert tables with inconsistent state configurations (partial upsert or dropOutOfOrderRecord=true + * with consistency mode NONE and replication > 1). + * + * This configuration is dynamically updatable via ZK cluster config without requiring a server restart. + */ +public class UpsertInconsistentStateConfig implements PinotClusterConfigChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertInconsistentStateConfig.class); + private static final UpsertInconsistentStateConfig INSTANCE = new UpsertInconsistentStateConfig(); + + private final AtomicBoolean _forceCommitReloadEnabled = + new AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD); + + private UpsertInconsistentStateConfig() { + } + + public static UpsertInconsistentStateConfig getInstance() { + return INSTANCE; + } + + /** + * Checks if force commit/reload is allowed for the given table config. + */ + public boolean isForceCommitReloadAllowed(TableConfig tableConfig) { Review Comment: Potential NullPointerException if `tableConfig` is null. The method `TableConfigUtils.checkForInconsistentStateConfigs` no longer performs null checking and will throw NPE if tableConfig is null. Add null check before calling this method. ```suggestion public boolean isForceCommitReloadAllowed(TableConfig tableConfig) { if (tableConfig == null) { // When tableConfig is null, rely solely on the cluster-level flag to avoid NPE. return _forceCommitReloadEnabled.get(); } ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -1605,10 +1605,10 @@ public static void checkForDuplicates(List<String> columns) { } public static boolean checkForInconsistentStateConfigs(TableConfig tableConfig) { - return tableConfig != null && tableConfig.getUpsertConfig() != null && tableConfig.getReplication() > 1 && ( - tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL || ( - tableConfig.getUpsertConfig().isDropOutOfOrderRecord() - && tableConfig.getUpsertConfig().getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE)); + return tableConfig.getUpsertConfig() != null && tableConfig.getReplication() > 1 && ( Review Comment: Removed null check for `tableConfig` parameter. The method `checkForInconsistentStateConfigs` can receive null, which will cause a NullPointerException. The previous implementation checked `tableConfig != null` before accessing its methods. ```suggestion return tableConfig != null && tableConfig.getUpsertConfig() != null && tableConfig.getReplication() > 1 && ( ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -2573,17 +2573,20 @@ public PauseState resumeTopicsConsumption(String tableNameWithType, List<Integer private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) { // For partial-upsert tables or upserts with out-of-order events enabled, force-committing - // consuming segments is disabled. In some cases (especially when replication > 1), the - // server that consumed fewer rows was incorrectly selected as the winner, causing other - // servers to reconsume rows and resulting in inconsistent data when previous state must - // be referenced for add/update operations. - // TODO: Temporarily disabled until a proper fix is implemented. + // consuming segments is disabled by default. In some cases (especially when replication > 1), + // the server that consumed fewer rows was incorrectly selected as the winner, causing other + // servers to reconsume rows and resulting in inconsistent data. + // This behavior can be controlled via cluster config without requiring server restart. TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); - if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) { + if (tableConfig == null) { + throw new IllegalStateException("Table config not found for table: " + tableNameWithType); + } + if (!UpsertInconsistentStateConfig.getInstance().isForceCommitReloadAllowed(tableConfig)) { throw new IllegalStateException( - "Force commit is not allowed when replication > 1 for partial-upsert tables, or for upsert tables" - + " when dropOutOfOrder is enabled with consistency mode: " + UpsertConfig.ConsistencyMode.NONE - + " for the table: " + tableNameWithType); + "Force commit disabled for table: " + tableNameWithType + + ". Table is configured as partial upsert or dropOutOfOrderRecord=true with replication > 1, " + + "which can cause data inconsistency during force commit. " + + "To override, set cluster config: " + UpsertInconsistentStateConfig.getInstance().getConfigKey()); Review Comment: The error message states 'Force commit disabled' but doesn't indicate whether the config is currently enabled or disabled. Consider adding the current config value to help operators understand if they need to enable or disable the setting. ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java: ########## @@ -350,6 +349,8 @@ public void testForceCommitWithNonConsumingSegmentsIsIgnored() { FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(); segmentManager._numReplicas = 1; segmentManager.makeTableConfig(); + when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn( + segmentManager._tableConfig); Review Comment: The removed tests (`testForceCommitUpsertWithOutOfOrderTable`, `testForceCommitPartialUpsertTableWithMultipleReplicas`, `testForceCommitPartialUpsertTableWithNoReplica`) verified important behavior for upsert tables with different configurations. Since the behavior is now configurable rather than hard-blocked, new tests should be added to verify: (1) force commit succeeds when config is enabled, (2) force commit fails when config is disabled for inconsistent state tables, and (3) the config change listener correctly updates the behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
