This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9f8877d521c Donot allow reload/Force commit on a consuming segment for
a Upsert Table with dropOutOfOrderRecord=true (#17251)
9f8877d521c is described below
commit 9f8877d521ccf180bd2beba8f53621b172488aec
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Sun Nov 23 15:09:41 2025 -0800
Donot allow reload/Force commit on a consuming segment for a Upsert Table
with dropOutOfOrderRecord=true (#17251)
* Add additional config check for data inconsistencies during reload/force
commit operations
* Add a new metric overall for upserts with dropOutOfOrderRecord enabled
with no consistency mode
* Update tests
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 15 ++++----
.../PinotLLCRealtimeSegmentManagerTest.java | 42 +++++++++++++++++++---
.../core/data/manager/BaseTableDataManager.java | 17 +++++----
.../realtime/RealtimeSegmentDataManager.java | 24 ++++++++-----
.../upsert/BasePartitionUpsertMetadataManager.java | 18 ++++++++--
.../segment/local/utils/TableConfigUtils.java | 8 +++--
.../starter/helix/HelixInstanceDataManager.java | 19 ++++++----
8 files changed, 106 insertions(+), 38 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 3cd65435bcb..f0b304feb69 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -55,6 +55,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
REALTIME_ROWS_AHEAD_OF_ZK("rows", false),
+ REALTIME_UPSERT_INCONSISTENT_ROWS("rows", false),
// number of times partition of a record did not match the partition of the
stream
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a9ce6b0263c..7ab39ae5be4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2547,15 +2547,18 @@ public class PinotLLCRealtimeSegmentManager {
}
private void sendForceCommitMessageToServers(String tableNameWithType,
Set<String> consumingSegments) {
- // For partial upsert tables, force-committing consuming segments is
disabled.
- // In some cases (especially when replication > 1), the server with fewer
consumed rows
- // was incorrectly chosen as the winner, causing other servers to
reconsume rows
- // and leading to inconsistent data.
+ // For partial-upsert tables or upserts with out-of-order events enabled,
force-committing
+ // consuming segments is disabled. In some cases (especially when
replication > 1), the
+ // server that consumed fewer rows was incorrectly selected as the winner,
causing other
+ // servers to reconsume rows and resulting in inconsistent data when
previous state must
+ // be referenced for add/update operations.
// TODO: Temporarily disabled until a proper fix is implemented.
TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
- if (TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
throw new IllegalStateException(
- "Force commit is not allowed for partial upsert tables: {} when
replication > 1" + tableNameWithType);
+ "Force commit is not allowed when replication > 1 for partial-upsert
tables, or for upsert tables"
+ + " when dropOutOfOrder is enabled with consistency mode: " +
UpsertConfig.ConsistencyMode.NONE
+ + " for the table: " + tableNameWithType);
}
if (!consumingSegments.isEmpty()) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 28ef538968d..842eea188cb 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -75,6 +75,7 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -355,6 +356,35 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertTrue(committed.isEmpty(), "Expected no segments to be committed when
only non-consuming segments provided");
}
+ @Test
+ public void testForceCommitUpsertWithOutOfOrderTable() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ segmentManager._numReplicas = 2; // RF > 1
+ Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDropOutOfOrderRecord(true);
+ upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+ segmentManager._tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
+ .build();
+ segmentManager._streamConfigs =
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
+
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
+ segmentManager._tableConfig);
+ segmentManager._numInstances = 3;
+ segmentManager.makeConsumingInstancePartitions();
+ segmentManager._numPartitions = 1;
+ segmentManager.setUpNewTable();
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ segmentManager._idealState.setPartitionState(consumingSegment, "Server_0",
SegmentStateModel.CONSUMING);
+ try {
+ segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
ForceCommitBatchConfig.of(1, 1, 5));
+ fail("Expected IllegalStateException for partial upsert table with RF >
1");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Force commit is not allowed when
replication > 1 for partial-upsert tables"),
+ "Exception message should mention partial upsert and replication");
+ }
+ }
+
@Test
public void testForceCommitPartialUpsertTableWithMultipleReplicas() {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
@@ -362,8 +392,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
segmentManager._tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(
- new org.apache.pinot.spi.config.table.UpsertConfig(
-
org.apache.pinot.spi.config.table.UpsertConfig.Mode.PARTIAL)).build();
+ new UpsertConfig(
+ UpsertConfig.Mode.PARTIAL)).build();
segmentManager._streamConfigs =
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
segmentManager._tableConfig);
@@ -377,7 +407,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment,
ForceCommitBatchConfig.of(1, 1, 5));
fail("Expected IllegalStateException for partial upsert table with RF >
1");
} catch (IllegalStateException e) {
- assertTrue(e.getMessage().contains("Force commit is not allowed for
partial upsert tables"),
+ assertTrue(e.getMessage().contains(
+ "Force commit is not allowed when replication > 1 for
partial-upsert tables, or for upsert tables when "
+ + "dropOutOfOrder is enabled with consistency mode: NONE for
the table: testTable_REALTIME"),
"Exception message should mention partial upsert and replication");
}
}
@@ -389,8 +421,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
segmentManager._tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(
- new org.apache.pinot.spi.config.table.UpsertConfig(
-
org.apache.pinot.spi.config.table.UpsertConfig.Mode.PARTIAL)).build();
+ new UpsertConfig(
+ UpsertConfig.Mode.PARTIAL)).build();
segmentManager._streamConfigs =
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
segmentManager._tableConfig);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 9c96cfdef62..5e47960a57c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -819,15 +819,18 @@ public abstract class BaseTableDataManager implements
TableDataManager {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
// Use force commit to reload consuming segment
if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
- // For partial upsert tables, force-committing consuming segments is
disabled.
- // In some cases (especially when replication > 1), the server with
fewer consumed rows
- // was incorrectly chosen as the winner, causing other servers to
reconsume rows
- // and leading to inconsistent data.
+ // For partial-upsert tables or upserts with out-of-order events
enabled, force-committing
+ // consuming segments is disabled. In some cases (especially when
replication > 1), the
+ // server that consumed fewer rows was incorrectly selected as the
winner, causing other
+ // servers to reconsume rows and resulting in inconsistent data when
previous state must
+ // be referenced for add/update operations.
// TODO: Temporarily disabled until a proper fix is implemented.
TableConfig tableConfig = indexLoadingConfig.getTableConfig();
- if (TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
- _logger.warn("Skipping reload (force committing) on consuming
segment: {} for a Partial Upsert Table with "
- + "replication > 1", segmentName);
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ _logger.warn(
+ "Skipping reload (force committing) on consuming segment: {} for
a Partial Upsert Table/ upsert tables "
+ + "when dropOutOfOrder is enabled with no consistency mode",
+ segmentName);
} else {
_logger.info("Reloading (force committing) consuming segment: {}",
segmentName);
((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index b8e2ce9097e..c8bff45ad13 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1508,14 +1508,22 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
// Allow to catch up upto final offset, and then replace.
if (_currentOffset.compareTo(endOffset) > 0) {
- // For a partial upsert table, if a server consumed data ahead of
committed offset(i.e w.r.t winning
- // server) it can cause data inconsistencies and data correctness
problems. For example,
- // during reload/ force commit / pause & resume consumption
operations a server with lowest consumed rows
- // can be chosen as a winner if controller hasn't received
messages from other servers. In such situations,
- // the record location in the metadata would be left dangling
considering those primary keys as first
- // rather than merging it with previous entry
- if (_realtimeTableDataManager.isPartialUpsertEnabled()) {
- _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
+ // For partial-upsert tables or upserts with out-of-order events
enabled, force-committing
+ // consuming segments is disabled. In certain cases (especially
when replication > 1),
+ // the server that has consumed fewer rows may be incorrectly
selected as the winner.
+ // This causes other servers to reconsume rows, which can lead to
inconsistent data
+ // when previous states must be referenced to add or update rows.
+ // In these scenarios, the record location stored in the metadata
can become stale or
+ // incorrect. Primary keys may be treated as new instead of being
merged with existing
+ // entries, and records that should have been dropped during
reconsumption may not be
+ // dropped at all.
+ if (_realtimeTableDataManager.isUpsertEnabled()
+ && _realtimeTableDataManager.getTableUpsertMetadataManager()
!= null) {
+ UpsertContext context =
_realtimeTableDataManager.getTableUpsertMetadataManager().getContext();
+ if (_realtimeTableDataManager.isPartialUpsertEnabled() ||
(context.isDropOutOfOrderRecord()
+ && context.getConsistencyMode() ==
UpsertConfig.ConsistencyMode.NONE)) {
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
+ }
}
// We moved ahead of the offset that is committed in ZK.
_segmentLogger.warn("Current offset {} ahead of the offset in zk
{}. Downloading to replace",
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 5373db37fa0..fdd4b1bb62e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -660,11 +660,25 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- if (_partialUpsertHandler != null) {
+ // Add the new metric tracking here
+ if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
+ // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
+ // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
+ // leading to some rows not getting dropped when reconsumed. This can
be caused when a consuming segment
+ // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
+ // of the other server
+ _logger.warn(
+ "Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
+ + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
+ numKeysNotReplaced, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+ numKeysNotReplaced);
+ } else if (_partialUpsertHandler != null) {
// For partial-upsert table, because we do not restore the original
record location when removing the primary
// keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
// consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order).
+ // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
+ // when a segment is replaced with lesser consumed rows from the other
server).
_logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
+ "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 51fb7c505c7..013f41adb4c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1594,9 +1594,11 @@ public final class TableConfigUtils {
}
}
- public static boolean checkForPartialUpsertWithReplicas(TableConfig
tableConfig) {
- return tableConfig != null && tableConfig.getReplication() > 1 &&
tableConfig.getUpsertConfig() != null
- && tableConfig.getUpsertConfig().getMode() ==
UpsertConfig.Mode.PARTIAL;
+ public static boolean checkForInconsistentStateConfigs(TableConfig
tableConfig) {
+ return tableConfig != null && tableConfig.getUpsertConfig() != null &&
tableConfig.getReplication() > 1 && (
+ tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL
|| (
+ tableConfig.getUpsertConfig().isDropOutOfOrderRecord()
+ && tableConfig.getUpsertConfig().getConsistencyMode() ==
UpsertConfig.ConsistencyMode.NONE));
}
// enum of all the skip-able validation types.
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index ddb3c488b68..edead05bd58 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -69,6 +69,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -552,15 +553,19 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
if (segmentDataManager != null) {
try {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
- // For partial upsert tables, force-committing consuming
segments is disabled.
- // In some cases (especially when replication > 1), the server
with fewer consumed rows
- // was incorrectly chosen as the winner, causing other servers
to reconsume rows
- // and leading to inconsistent data.
+ // For partial-upsert tables or upserts with out-of-order events
enabled, force-committing
+ // consuming segments is disabled. In some cases (especially
when replication > 1), the
+ // server that consumed fewer rows was incorrectly selected as
the winner, causing other
+ // servers to reconsume rows and resulting in inconsistent data
when previous state must
+ // be referenced for add/update operations.
+
// TODO: Temporarily disabled until a proper fix is implemented.
TableConfig tableConfig =
tableDataManager.getCachedTableConfigAndSchema().getLeft();
- if
(TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
- LOGGER.warn("Force commit is not allowed on a Partial Upsert
Table: {} when replication > 1",
- tableNameWithType);
+ if
(TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ LOGGER.warn(
+ "Force commit is not allowed when replication > 1 for
partial-upsert tables, or for upsert tables"
+ + " when dropOutOfOrder is enabled with consistency
mode: {}" + "for the table: {} ",
+ UpsertConfig.ConsistencyMode.NONE, tableNameWithType);
} else {
((RealtimeSegmentDataManager)
segmentDataManager).forceCommit();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]