This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d0bf62417 Deterministically order dimension-table segments during 
lookup loading (#17536)
19d0bf62417 is described below

commit 19d0bf6241799af58e3b482ba6bc4f329d4a4c35
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Jan 25 12:17:33 2026 +0800

    Deterministically order dimension-table segments during lookup loading 
(#17536)
---
 .../manager/offline/DimensionTableDataManager.java | 23 ++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index b3f1589de16..a1084f02367 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -26,6 +26,7 @@ import 
it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -213,6 +214,9 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    if (!_errorOnDuplicatePrimaryKey) {
+      sortSegmentsForUpsert(segmentDataManagers);
+    }
     try {
       // count all documents to limit map re-sizings
       int totalDocs = 0;
@@ -249,7 +253,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
               if (_errorOnDuplicatePrimaryKey && previousValue != null) {
                 throw new IllegalStateException(
                     "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName()
-                        + "primary key already exist for: " + 
Arrays.toString(primaryKey));
+                        + " primary key already exists for: " + 
Arrays.toString(primaryKey));
               }
             }
           } catch (Exception e) {
@@ -288,6 +292,9 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    if (!_errorOnDuplicatePrimaryKey) {
+      sortSegmentsForUpsert(segmentDataManagers);
+    }
     List<PinotSegmentRecordReader> recordReaders = new 
ArrayList<>(segmentDataManagers.size());
 
     int totalDocs = 0;
@@ -320,7 +327,12 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
             Object[] primaryKey = recordReader.getRecordValues(i, pkIndexes);
 
             long readerIdxAndDocId = (((long) readerIdx) << 32) | (i & 
0xffffffffL);
-            lookupTable.put(primaryKey, readerIdxAndDocId);
+            long previousValue = lookupTable.put(primaryKey, 
readerIdxAndDocId);
+            if (_errorOnDuplicatePrimaryKey && previousValue != 
Long.MIN_VALUE) {
+              throw new IllegalStateException(
+                  "Caught exception while reading records from segment: " + 
indexSegment.getSegmentName()
+                      + " primary key already exists for: " + 
Arrays.toString(primaryKey));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException(
@@ -332,6 +344,13 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
         this);
   }
 
+  private void sortSegmentsForUpsert(List<SegmentDataManager> 
segmentDataManagers) {
+    segmentDataManagers.sort(Comparator
+        .comparingLong((SegmentDataManager segmentDataManager) -> 
segmentDataManager.getSegment().getSegmentMetadata()
+            .getIndexCreationTime())
+        .thenComparing(segmentDataManager -> 
segmentDataManager.getSegment().getSegmentName()));
+  }
+
   private void releaseResources(List<PinotSegmentRecordReader> recordReaders,
       List<SegmentDataManager> segmentDataManagers) {
     for (PinotSegmentRecordReader reader : recordReaders) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to