This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f8877d521c Donot allow reload/Force commit on a consuming segment for 
a Upsert Table with dropOutOfOrderRecord=true (#17251)
9f8877d521c is described below

commit 9f8877d521ccf180bd2beba8f53621b172488aec
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Sun Nov 23 15:09:41 2025 -0800

    Donot allow reload/Force commit on a consuming segment for a Upsert Table 
with dropOutOfOrderRecord=true (#17251)
    
    * Add additional config check for data inconsistencies during reload/force 
commit operations
    
    * Add a new metric overall for upserts with dropOutOfOrderRecord enabled 
with no consistency mode
    
    * Update tests
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 15 ++++----
 .../PinotLLCRealtimeSegmentManagerTest.java        | 42 +++++++++++++++++++---
 .../core/data/manager/BaseTableDataManager.java    | 17 +++++----
 .../realtime/RealtimeSegmentDataManager.java       | 24 ++++++++-----
 .../upsert/BasePartitionUpsertMetadataManager.java | 18 ++++++++--
 .../segment/local/utils/TableConfigUtils.java      |  8 +++--
 .../starter/helix/HelixInstanceDataManager.java    | 19 ++++++----
 8 files changed, 106 insertions(+), 38 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 3cd65435bcb..f0b304feb69 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -55,6 +55,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
   STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
   REALTIME_ROWS_AHEAD_OF_ZK("rows", false),
+  REALTIME_UPSERT_INCONSISTENT_ROWS("rows", false),
   // number of times partition of a record did not match the partition of the 
stream
   REALTIME_PARTITION_MISMATCH("mismatch", false),
   REALTIME_DEDUP_DROPPED("rows", false),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a9ce6b0263c..7ab39ae5be4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2547,15 +2547,18 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private void sendForceCommitMessageToServers(String tableNameWithType, 
Set<String> consumingSegments) {
-    // For partial upsert tables, force-committing consuming segments is 
disabled.
-    // In some cases (especially when replication > 1), the server with fewer 
consumed rows
-    // was incorrectly chosen as the winner, causing other servers to 
reconsume rows
-    // and leading to inconsistent data.
+    // For partial-upsert tables or upserts with out-of-order events enabled, 
force-committing
+    // consuming segments is disabled. In some cases (especially when 
replication > 1), the
+    // server that consumed fewer rows was incorrectly selected as the winner, 
causing other
+    // servers to reconsume rows and resulting in inconsistent data when 
previous state must
+    // be referenced for add/update operations.
     // TODO: Temporarily disabled until a proper fix is implemented.
     TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
-    if (TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
+    if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
       throw new IllegalStateException(
-          "Force commit is not allowed for partial upsert tables: {} when 
replication > 1" + tableNameWithType);
+          "Force commit is not allowed when replication > 1 for partial-upsert 
tables, or for upsert tables"
+              + " when dropOutOfOrder is enabled with consistency mode: " + 
UpsertConfig.ConsistencyMode.NONE
+              + " for the table: " + tableNameWithType);
     }
 
     if (!consumingSegments.isEmpty()) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 28ef538968d..842eea188cb 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -75,6 +75,7 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -355,6 +356,35 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertTrue(committed.isEmpty(), "Expected no segments to be committed when 
only non-consuming segments provided");
   }
 
+  @Test
+  public void testForceCommitUpsertWithOutOfOrderTable() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    segmentManager._numReplicas = 2; // RF > 1
+    Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDropOutOfOrderRecord(true);
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+    segmentManager._tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(upsertConfig)
+        .build();
+    segmentManager._streamConfigs = 
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
+    
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
+        segmentManager._tableConfig);
+    segmentManager._numInstances = 3;
+    segmentManager.makeConsumingInstancePartitions();
+    segmentManager._numPartitions = 1;
+    segmentManager.setUpNewTable();
+    String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+    segmentManager._idealState.setPartitionState(consumingSegment, "Server_0", 
SegmentStateModel.CONSUMING);
+    try {
+      segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
ForceCommitBatchConfig.of(1, 1, 5));
+      fail("Expected IllegalStateException for partial upsert table with RF > 
1");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Force commit is not allowed when 
replication > 1 for partial-upsert tables"),
+          "Exception message should mention partial upsert and replication");
+    }
+  }
+
   @Test
   public void testForceCommitPartialUpsertTableWithMultipleReplicas() {
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
@@ -362,8 +392,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     segmentManager._tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
         
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(
-            new org.apache.pinot.spi.config.table.UpsertConfig(
-                
org.apache.pinot.spi.config.table.UpsertConfig.Mode.PARTIAL)).build();
+            new UpsertConfig(
+                UpsertConfig.Mode.PARTIAL)).build();
     segmentManager._streamConfigs = 
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
     
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
         segmentManager._tableConfig);
@@ -377,7 +407,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
       segmentManager.forceCommit(REALTIME_TABLE_NAME, null, consumingSegment, 
ForceCommitBatchConfig.of(1, 1, 5));
       fail("Expected IllegalStateException for partial upsert table with RF > 
1");
     } catch (IllegalStateException e) {
-      assertTrue(e.getMessage().contains("Force commit is not allowed for 
partial upsert tables"),
+      assertTrue(e.getMessage().contains(
+              "Force commit is not allowed when replication > 1 for 
partial-upsert tables, or for upsert tables when "
+                  + "dropOutOfOrder is enabled with consistency mode: NONE for 
the table: testTable_REALTIME"),
           "Exception message should mention partial upsert and replication");
     }
   }
@@ -389,8 +421,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, String> streamConfigs = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     segmentManager._tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
         
.setNumReplicas(segmentManager._numReplicas).setStreamConfigs(streamConfigs).setUpsertConfig(
-            new org.apache.pinot.spi.config.table.UpsertConfig(
-                
org.apache.pinot.spi.config.table.UpsertConfig.Mode.PARTIAL)).build();
+            new UpsertConfig(
+                UpsertConfig.Mode.PARTIAL)).build();
     segmentManager._streamConfigs = 
IngestionConfigUtils.getStreamConfigs(segmentManager._tableConfig);
     
when(segmentManager._mockResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(
         segmentManager._tableConfig);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 9c96cfdef62..5e47960a57c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -819,15 +819,18 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     if (segmentDataManager instanceof RealtimeSegmentDataManager) {
       // Use force commit to reload consuming segment
       if (_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
-        // For partial upsert tables, force-committing consuming segments is 
disabled.
-        // In some cases (especially when replication > 1), the server with 
fewer consumed rows
-        // was incorrectly chosen as the winner, causing other servers to 
reconsume rows
-        // and leading to inconsistent data.
+        // For partial-upsert tables or upserts with out-of-order events 
enabled, force-committing
+        // consuming segments is disabled. In some cases (especially when 
replication > 1), the
+        // server that consumed fewer rows was incorrectly selected as the 
winner, causing other
+        // servers to reconsume rows and resulting in inconsistent data when 
previous state must
+        // be referenced for add/update operations.
         // TODO: Temporarily disabled until a proper fix is implemented.
         TableConfig tableConfig = indexLoadingConfig.getTableConfig();
-        if (TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
-          _logger.warn("Skipping reload (force committing) on consuming 
segment: {} for a Partial Upsert Table with "
-              + "replication > 1", segmentName);
+        if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+          _logger.warn(
+              "Skipping reload (force committing) on consuming segment: {} for 
a Partial Upsert Table/ upsert tables "
+                  + "when dropOutOfOrder is enabled with no consistency mode",
+              segmentName);
         } else {
           _logger.info("Reloading (force committing) consuming segment: {}", 
segmentName);
           ((RealtimeSegmentDataManager) segmentDataManager).forceCommit();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index b8e2ce9097e..c8bff45ad13 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1508,14 +1508,22 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           }
           // Allow to catch up upto final offset, and then replace.
           if (_currentOffset.compareTo(endOffset) > 0) {
-            // For a partial upsert table, if a server consumed data ahead of 
committed offset(i.e w.r.t winning
-            // server) it can cause data inconsistencies and data correctness 
problems. For example,
-            // during reload/ force commit / pause & resume consumption 
operations a server with lowest consumed rows
-            // can be chosen as a winner if controller hasn't received 
messages from other servers. In such situations,
-            // the record location in the metadata would be left dangling 
considering those primary keys as first
-            // rather than merging it with previous entry
-            if (_realtimeTableDataManager.isPartialUpsertEnabled()) {
-              _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
+            // For partial-upsert tables or upserts with out-of-order events 
enabled, force-committing
+            // consuming segments is disabled. In certain cases (especially 
when replication > 1),
+            // the server that has consumed fewer rows may be incorrectly 
selected as the winner.
+            // This causes other servers to reconsume rows, which can lead to 
inconsistent data
+            // when previous states must be referenced to add or update rows.
+            // In these scenarios, the record location stored in the metadata 
can become stale or
+            // incorrect. Primary keys may be treated as new instead of being 
merged with existing
+            // entries, and records that should have been dropped during 
reconsumption may not be
+            // dropped at all.
+            if (_realtimeTableDataManager.isUpsertEnabled()
+                && _realtimeTableDataManager.getTableUpsertMetadataManager() 
!= null) {
+              UpsertContext context = 
_realtimeTableDataManager.getTableUpsertMetadataManager().getContext();
+              if (_realtimeTableDataManager.isPartialUpsertEnabled() || 
(context.isDropOutOfOrderRecord()
+                  && context.getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE)) {
+                _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
+              }
             }
             // We moved ahead of the offset that is committed in ZK.
             _segmentLogger.warn("Current offset {} ahead of the offset in zk 
{}. Downloading to replace",
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 5373db37fa0..fdd4b1bb62e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -660,11 +660,25 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
       int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-      if (_partialUpsertHandler != null) {
+      // Add the new metric tracking here
+      if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
+        // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
+        // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
+        // leading to some rows not getting dropped when reconsumed. This can 
be caused when a consuming segment
+        // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
+        // of the other server
+        _logger.warn(
+            "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
+                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
+            numKeysNotReplaced, segmentName);
+        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+            numKeysNotReplaced);
+      } else if (_partialUpsertHandler != null) {
         // For partial-upsert table, because we do not restore the original 
record location when removing the primary
         // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
         // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order).
+        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
+        // when a segment is replaced with lesser consumed rows from the other 
server).
         _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
             + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 51fb7c505c7..013f41adb4c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1594,9 +1594,11 @@ public final class TableConfigUtils {
     }
   }
 
-  public static boolean checkForPartialUpsertWithReplicas(TableConfig 
tableConfig) {
-    return tableConfig != null && tableConfig.getReplication() > 1 && 
tableConfig.getUpsertConfig() != null
-        && tableConfig.getUpsertConfig().getMode() == 
UpsertConfig.Mode.PARTIAL;
+  public static boolean checkForInconsistentStateConfigs(TableConfig 
tableConfig) {
+    return tableConfig != null && tableConfig.getUpsertConfig() != null && 
tableConfig.getReplication() > 1 && (
+        tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL 
|| (
+            tableConfig.getUpsertConfig().isDropOutOfOrderRecord()
+                && tableConfig.getUpsertConfig().getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE));
   }
 
   // enum of all the skip-able validation types.
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index ddb3c488b68..edead05bd58 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -69,6 +69,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -552,15 +553,19 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         if (segmentDataManager != null) {
           try {
             if (segmentDataManager instanceof RealtimeSegmentDataManager) {
-              // For partial upsert tables, force-committing consuming 
segments is disabled.
-              // In some cases (especially when replication > 1), the server 
with fewer consumed rows
-              // was incorrectly chosen as the winner, causing other servers 
to reconsume rows
-              // and leading to inconsistent data.
+              // For partial-upsert tables or upserts with out-of-order events 
enabled, force-committing
+              // consuming segments is disabled. In some cases (especially 
when replication > 1), the
+              // server that consumed fewer rows was incorrectly selected as 
the winner, causing other
+              // servers to reconsume rows and resulting in inconsistent data 
when previous state must
+              // be referenced for add/update operations.
+
               // TODO: Temporarily disabled until a proper fix is implemented.
               TableConfig tableConfig = 
tableDataManager.getCachedTableConfigAndSchema().getLeft();
-              if 
(TableConfigUtils.checkForPartialUpsertWithReplicas(tableConfig)) {
-                LOGGER.warn("Force commit is not allowed on a Partial Upsert 
Table: {} when replication > 1",
-                    tableNameWithType);
+              if 
(TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+                LOGGER.warn(
+                    "Force commit is not allowed when replication > 1 for 
partial-upsert tables, or for upsert tables"
+                        + " when dropOutOfOrder is enabled with consistency 
mode: {}" + "for the table: {} ",
+                    UpsertConfig.ConsistencyMode.NONE, tableNameWithType);
               } else {
                 ((RealtimeSegmentDataManager) 
segmentDataManager).forceCommit();
               }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to