Jackie-Jiang commented on code in PR #17503:
URL: https://github.com/apache/pinot/pull/17503#discussion_r2722837497
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2171,12 +2171,33 @@ public static class ConfigChangeListenerConstants {
* Cluster config key to control whether force commit/reload is allowed
for upsert tables
* with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
* with consistency mode NONE and replication > 1).
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ * @deprecated Use {@link #CONSUMING_SEGMENT_COMMIT_CONFIG} instead.
*/
+ @Deprecated
public static final String FORCE_COMMIT_RELOAD_CONFIG =
"pinot.server.upsert.force.commit.reload";
+ /**
+ * Cluster config key to control the commit mode for consuming segments in
upsert tables.
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ */
+ public static final String CONSUMING_SEGMENT_COMMIT_CONFIG =
"pinot.server.consuming.segment.commit.mode";
Review Comment:
This key is confusing. Commit mode could mean pauseless, split (commit in 2
steps) etc.
We use this mode to define how to handle the inconsistency during consuming
segment commit for upsert and dedup. Let's come up with a name that is more
specific.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2171,12 +2171,33 @@ public static class ConfigChangeListenerConstants {
* Cluster config key to control whether force commit/reload is allowed
for upsert tables
* with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
* with consistency mode NONE and replication > 1).
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ * @deprecated Use {@link #CONSUMING_SEGMENT_COMMIT_CONFIG} instead.
*/
+ @Deprecated
public static final String FORCE_COMMIT_RELOAD_CONFIG =
"pinot.server.upsert.force.commit.reload";
+ /**
+ * Cluster config key to control the commit mode for consuming segments in
upsert tables.
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ */
+ public static final String CONSUMING_SEGMENT_COMMIT_CONFIG =
"pinot.server.consuming.segment.commit.mode";
+
/**
* Default value: true (force commit/reload is allowed by default).
+ * @deprecated Use {@link #DEFAULT_CONSUMING_COMMIT_MODE} instead.
*/
+ @Deprecated
Review Comment:
This can also be removed
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentCommitModeProvider.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provider for accessing the current Consuming segment Commit Mode from
anywhere in the codebase.
+ * This allows pinot-segment-local to access the dynamically configured mode
without
+ * depending on pinot-core.
+ *
+ * The supplier is registered by UpsertInconsistentStateConfig during
server/controller startup.
+ */
+public final class ConsumingSegmentCommitModeProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumingSegmentCommitModeProvider.class);
+ private static final Supplier<Mode> DEFAULT_SUPPLIER = () -> Mode.NONE;
+
+ public enum Mode {
+ /**
+ * Reload is disabled for tables with inconsistent state configurations.
+ * Safe option that prevents potential data inconsistency issues.
+ */
+ NONE(false),
+
+ /**
+ * Reload is enabled but tables with partial upsert or
dropOutOfOrderRecord=true (with replication > 1) will be
+ * skipped. When inconsistencies are detected during reload/force commit,
upsert metadata is reverted.
+ */
+ PROTECTED(true),
+
+ /**
+ * Reload is enabled for all tables regardless of their configuration.
+ * Use with caution as this may cause data inconsistency for
partial-upsert tables
+ * or upsert tables with dropOutOfOrderRecord/ outOfOrderRecordColumn
enabled when replication > 1.
+ * Inconsistency checks and metadata revert are skipped.
+ */
+ UNSAFE(true);
+
+ private final boolean _reloadEnabled;
+
+ Mode(boolean reloadEnabled) {
+ _reloadEnabled = reloadEnabled;
+ }
+
+ public boolean isReloadEnabled() {
Review Comment:
This is attached to force commit, but not reload. Let's rename it to
`isForceCommitAllowed()`
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentCommitModeProvider.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provider for accessing the current Consuming segment Commit Mode from
anywhere in the codebase.
+ * This allows pinot-segment-local to access the dynamically configured mode
without
+ * depending on pinot-core.
+ *
+ * The supplier is registered by UpsertInconsistentStateConfig during
server/controller startup.
+ */
+public final class ConsumingSegmentCommitModeProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumingSegmentCommitModeProvider.class);
+ private static final Supplier<Mode> DEFAULT_SUPPLIER = () -> Mode.NONE;
+
+ public enum Mode {
+ /**
+ * Reload is disabled for tables with inconsistent state configurations.
+ * Safe option that prevents potential data inconsistency issues.
+ */
+ NONE(false),
+
+ /**
+ * Reload is enabled but tables with partial upsert or
dropOutOfOrderRecord=true (with replication > 1) will be
+ * skipped. When inconsistencies are detected during reload/force commit,
upsert metadata is reverted.
+ */
+ PROTECTED(true),
+
+ /**
+ * Reload is enabled for all tables regardless of their configuration.
+ * Use with caution as this may cause data inconsistency for
partial-upsert tables
+ * or upsert tables with dropOutOfOrderRecord/ outOfOrderRecordColumn
enabled when replication > 1.
+ * Inconsistency checks and metadata revert are skipped.
+ */
+ UNSAFE(true);
+
+ private final boolean _reloadEnabled;
+
+ Mode(boolean reloadEnabled) {
+ _reloadEnabled = reloadEnabled;
+ }
+
+ public boolean isReloadEnabled() {
+ return _reloadEnabled;
+ }
+
+ public boolean isUnsafe() {
+ return this == UNSAFE;
+ }
+
+ public boolean isProtected() {
+ return this == PROTECTED;
+ }
+
+ /**
+ * Parses a string value to Mode.
+ * Supports case-insensitive matching and also legacy boolean values for
backward compatibility.
+ *
+ * @param value the string value to parse
+ * @param defaultMode the default mode to return if value is null or
invalid
+ * @return the parsed Mode
+ */
+ public static Mode fromString(String value, Mode defaultMode) {
+ if (value == null || value.trim().isEmpty()) {
+ return defaultMode;
+ }
+
+ String trimmedValue = value.trim().toUpperCase();
+
+ // Try to match enum name directly
+ for (Mode mode : values()) {
+ if (mode.name().equals(trimmedValue)) {
+ return mode;
+ }
+ }
+
+ // Support legacy boolean values for backward compatibility
+ if ("TRUE".equals(trimmedValue)) {
+ return PROTECTED;
+ }
+ if ("FALSE".equals(trimmedValue)) {
+ return NONE;
+ }
+
+ return defaultMode;
+ }
+ }
+
+ private static volatile Supplier<Mode> _modeSupplier = DEFAULT_SUPPLIER;
+ private static volatile boolean _registered = false;
+
+ private ConsumingSegmentCommitModeProvider() {
+ }
+
+ public static void register(Supplier<Mode> modeSupplier) {
Review Comment:
I don't think this provider is needed. Seems you want a singleton class for
the supplier, where `UpsertInconsistentStateConfig` is already a singleton class
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2171,12 +2171,33 @@ public static class ConfigChangeListenerConstants {
* Cluster config key to control whether force commit/reload is allowed
for upsert tables
* with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
* with consistency mode NONE and replication > 1).
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ * @deprecated Use {@link #CONSUMING_SEGMENT_COMMIT_CONFIG} instead.
*/
+ @Deprecated
public static final String FORCE_COMMIT_RELOAD_CONFIG =
"pinot.server.upsert.force.commit.reload";
+ /**
+ * Cluster config key to control the commit mode for consuming segments in
upsert tables.
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
Review Comment:
We don't need to support old boolean value (the key is newly added, and
there is no legacy behavior). Let's simplify it
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2171,12 +2171,33 @@ public static class ConfigChangeListenerConstants {
* Cluster config key to control whether force commit/reload is allowed
for upsert tables
* with inconsistent state configurations (partial upsert or
dropOutOfOrderRecord=true
* with consistency mode NONE and replication > 1).
+ *
+ * Supported values: NONE, PROTECTED, UNSAFE
+ * Legacy boolean values (true/false) are also supported for backward
compatibility.
+ * @deprecated Use {@link #CONSUMING_SEGMENT_COMMIT_CONFIG} instead.
*/
+ @Deprecated
Review Comment:
I don't see any logic handling this deprecated key. Given this is newly
added, we can just remove it.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java:
##########
@@ -41,62 +40,54 @@ public class UpsertInconsistentStateConfig implements
PinotClusterConfigChangeLi
private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
private static final UpsertInconsistentStateConfig INSTANCE = new
UpsertInconsistentStateConfig();
- private final AtomicBoolean _forceCommitReloadEnabled =
- new
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
+ private final AtomicReference<Mode> _forceCommitReloadMode = new
AtomicReference<>(
+
Mode.fromString(ConfigChangeListenerConstants.DEFAULT_CONSUMING_COMMIT_MODE,
Mode.NONE));
Review Comment:
You can directly define `DEFAULT_CONSUMING_COMMIT_MODE` as `Mode`. Why is
the fall back `NONE` instead of the default?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/ConsumingSegmentCommitModeProvider.java:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provider for accessing the current Consuming segment Commit Mode from
anywhere in the codebase.
+ * This allows pinot-segment-local to access the dynamically configured mode
without
+ * depending on pinot-core.
+ *
+ * The supplier is registered by UpsertInconsistentStateConfig during
server/controller startup.
+ */
+public final class ConsumingSegmentCommitModeProvider {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumingSegmentCommitModeProvider.class);
+ private static final Supplier<Mode> DEFAULT_SUPPLIER = () -> Mode.NONE;
+
+ public enum Mode {
+ /**
+ * Reload is disabled for tables with inconsistent state configurations.
+ * Safe option that prevents potential data inconsistency issues.
+ */
+ NONE(false),
+
+ /**
+ * Reload is enabled but tables with partial upsert or
dropOutOfOrderRecord=true (with replication > 1) will be
+ * skipped. When inconsistencies are detected during reload/force commit,
upsert metadata is reverted.
+ */
+ PROTECTED(true),
+
+ /**
+ * Reload is enabled for all tables regardless of their configuration.
+ * Use with caution as this may cause data inconsistency for
partial-upsert tables
+ * or upsert tables with dropOutOfOrderRecord/ outOfOrderRecordColumn
enabled when replication > 1.
+ * Inconsistency checks and metadata revert are skipped.
+ */
+ UNSAFE(true);
+
+ private final boolean _reloadEnabled;
+
+ Mode(boolean reloadEnabled) {
+ _reloadEnabled = reloadEnabled;
+ }
+
+ public boolean isReloadEnabled() {
+ return _reloadEnabled;
+ }
+
+ public boolean isUnsafe() {
+ return this == UNSAFE;
+ }
+
+ public boolean isProtected() {
+ return this == PROTECTED;
+ }
Review Comment:
(nit) These 2 methods are redundant. It is more clear to directly do `==` on
the caller side
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertInconsistentStateConfig.java:
##########
@@ -41,62 +40,54 @@ public class UpsertInconsistentStateConfig implements
PinotClusterConfigChangeLi
private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertInconsistentStateConfig.class);
private static final UpsertInconsistentStateConfig INSTANCE = new
UpsertInconsistentStateConfig();
- private final AtomicBoolean _forceCommitReloadEnabled =
- new
AtomicBoolean(ConfigChangeListenerConstants.DEFAULT_FORCE_COMMIT_RELOAD);
+ private final AtomicReference<Mode> _forceCommitReloadMode = new
AtomicReference<>(
+
Mode.fromString(ConfigChangeListenerConstants.DEFAULT_CONSUMING_COMMIT_MODE,
Mode.NONE));
private UpsertInconsistentStateConfig() {
+ // Register this instance as the provider so pinot-segment-local can
access the mode directly
+
ConsumingSegmentCommitModeProvider.register(this::getForceCommitReloadMode);
}
public static UpsertInconsistentStateConfig getInstance() {
return INSTANCE;
}
/**
- * Checks if force commit/reload is allowed for the given table config.
+ * Checks if force commit/reload is allowed based on the current mode.
*
- * @param tableConfig the table config to check, may be null
- * @return true if force commit/reload is allowed (either globally enabled
or table has no inconsistent configs)
+ * @return true if force commit/reload is allowed (mode is PROTECTED or
UNSAFE)
*/
- public boolean isForceCommitReloadAllowed(@Nullable TableConfig tableConfig)
{
- if (tableConfig == null) {
- return false;
- }
- if (_forceCommitReloadEnabled.get()) {
- return true;
- }
- // Allow if table doesn't have inconsistent state configs
- return !TableConfigUtils.checkForInconsistentStateConfigs(tableConfig);
+ public boolean isForceCommitReloadAllowed() {
Review Comment:
This is attached to force commit, but not reload. Let's rename it to
`isForceCommitAllowed()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]