This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cc278d232cf Add auto complete flag to Lineage entry (#18584)
cc278d232cf is described below

commit cc278d232cfe1d0b2905a71f9be9ebe2a3fb58e7
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Jun 4 02:23:35 2026 +0530

    Add auto complete flag to Lineage entry (#18584)
---
 .../apache/pinot/common/lineage/LineageEntry.java  | 22 +++++++++--
 .../pinot/common/lineage/SegmentLineage.java       | 15 +++++--
 .../pinot/common/lineage/SegmentLineageTest.java   | 46 ++++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  7 ++--
 4 files changed, 81 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
index e008b499db1..7a018b91fbd 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
@@ -31,12 +31,24 @@ public class LineageEntry {
   private final List<String> _segmentsTo;
   private final LineageEntryState _state;
   private final long _timestamp;
+  // Currently, lineage needs to be marked completed by clients be calling 
{@code endReplaceSegments}.
+  // For few use cases, we can simplify the client logic by allowing an 
observer (e.g. a controller-side periodic task)
+  // to mark lineage entries completed automatically once all the To segments 
are ONLINE in the external view.
+  // This field allows us to opt in to this behavior on a per-entry basis.
+  // Default false preserves client-driven completion semantics.
+  private final boolean _autoCompleteLineageEntry;
 
   public LineageEntry(List<String> segmentsFrom, List<String> segmentsTo, 
LineageEntryState state, long timestamp) {
+    this(segmentsFrom, segmentsTo, state, timestamp, false);
+  }
+
+  public LineageEntry(List<String> segmentsFrom, List<String> segmentsTo, 
LineageEntryState state, long timestamp,
+      boolean autoCompleteLineageEntry) {
     _segmentsFrom = segmentsFrom;
     _segmentsTo = segmentsTo;
     _state = state;
     _timestamp = timestamp;
+    _autoCompleteLineageEntry = autoCompleteLineageEntry;
   }
 
   public List<String> getSegmentsFrom() {
@@ -55,6 +67,10 @@ public class LineageEntry {
     return _timestamp;
   }
 
+  public boolean isAutoCompleteLineageEntry() {
+    return _autoCompleteLineageEntry;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -64,12 +80,12 @@ public class LineageEntry {
       return false;
     }
     LineageEntry that = (LineageEntry) o;
-    return _timestamp == that._timestamp && 
_segmentsFrom.equals(that._segmentsFrom) && _segmentsTo
-        .equals(that._segmentsTo) && _state == that._state;
+    return _timestamp == that._timestamp && _autoCompleteLineageEntry == 
that._autoCompleteLineageEntry
+        && _segmentsFrom.equals(that._segmentsFrom) && 
_segmentsTo.equals(that._segmentsTo) && _state == that._state;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(_segmentsFrom, _segmentsTo, _state, _timestamp);
+    return Objects.hash(_segmentsFrom, _segmentsTo, _state, _timestamp, 
_autoCompleteLineageEntry);
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
index b3776ddc9f2..ad5d533af45 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
@@ -149,12 +149,17 @@ public class SegmentLineage {
     for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
-      Preconditions.checkState(value.size() == 4);
+      // Tolerant read: legacy entries are 4-tuples. Newer entries append a 
5th element
+      // ("autoCompleteLineageEntry") only when true; a missing 5th defaults 
to false. The reader
+      // accepts both formats so older controllers/writers and newer ones can 
interoperate.
+      Preconditions.checkState(value.size() >= 4);
       List<String> segmentsFrom = 
Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR));
       List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), 
COMMA_SEPARATOR));
       LineageEntryState state = LineageEntryState.valueOf(value.get(2));
       long timestamp = Long.parseLong(value.get(3));
-      lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo, 
state, timestamp));
+      boolean autoCompleteLineageEntry = value.size() >= 5 && 
Boolean.parseBoolean(value.get(4));
+      lineageEntries.put(lineageId,
+          new LineageEntry(segmentsFrom, segmentsTo, state, timestamp, 
autoCompleteLineageEntry));
     }
     return new SegmentLineage(tableNameWithType, lineageEntries, customMap);
   }
@@ -171,7 +176,11 @@ public class SegmentLineage {
       String segmentsTo = String.join(COMMA_SEPARATOR, 
lineageEntry.getSegmentsTo());
       String state = lineageEntry.getState().toString();
       String timestamp = Long.toString(lineageEntry.getTimestamp());
-      List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state, 
timestamp);
+      // Omit the 5th element when the flag is the default (false): keeps the 
wire format
+      // identical to legacy 4-tuples for every entry that does not opt into 
observer-driven
+      // completion, so older readers continue to parse them.
+      List<String> listEntry = lineageEntry.isAutoCompleteLineageEntry() ? 
Arrays.asList(segmentsFrom, segmentsTo,
+          state, timestamp, Boolean.toString(true)) : 
Arrays.asList(segmentsFrom, segmentsTo, state, timestamp);
       znRecord.setListField(entry.getKey(), listEntry);
     }
     if (_customMap != null) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
index 2c61962a3a5..996a216cd06 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
@@ -68,6 +68,20 @@ public class SegmentLineageTest {
     Assert.assertEquals(lineageEntry4.getSegmentsTo(), Arrays.asList("s12"));
     Assert.assertEquals(lineageEntry4.getState(), 
LineageEntryState.IN_PROGRESS);
     Assert.assertEquals(lineageEntry4.getTimestamp(), 44444L);
+    Assert.assertFalse(lineageEntry4.isAutoCompleteLineageEntry());
+
+    // Entry opting into observer-driven completion: should round-trip the 
flag and emit a 5-tuple
+    // in the ZNRecord list field, while the other entries stay 4-tuples.
+    String id5 = SegmentLineageUtils.generateLineageEntryId();
+    segmentLineage.addLineageEntry(id5,
+        new LineageEntry(Arrays.asList("s13", "s14"), Arrays.asList("s15"), 
LineageEntryState.IN_PROGRESS, 55555L,
+            true));
+    LineageEntry lineageEntry5 = segmentLineage.getLineageEntry(id5);
+    Assert.assertEquals(lineageEntry5.getSegmentsFrom(), Arrays.asList("s13", 
"s14"));
+    Assert.assertEquals(lineageEntry5.getSegmentsTo(), Arrays.asList("s15"));
+    Assert.assertEquals(lineageEntry5.getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(lineageEntry5.getTimestamp(), 55555L);
+    Assert.assertTrue(lineageEntry5.isAutoCompleteLineageEntry());
 
     // Test the convesion from the segment lineage to the znRecord
     ZNRecord znRecord = segmentLineage.toZNRecord();
@@ -97,6 +111,15 @@ public class SegmentLineageTest {
     Assert.assertEquals(entry4.get(1), String.join(",", Arrays.asList("s12")));
     Assert.assertEquals(entry4.get(2), 
LineageEntryState.IN_PROGRESS.toString());
     Assert.assertEquals(entry4.get(3), Long.toString(44444L));
+    // Default-false entries must stay 4-tuples on the wire to keep old 
readers happy.
+    Assert.assertEquals(entry4.size(), 4);
+
+    List<String> entry5 = listFields.get(id5);
+    Assert.assertEquals(entry5.get(0), String.join(",", Arrays.asList("s13", 
"s14")));
+    Assert.assertEquals(entry5.get(1), String.join(",", Arrays.asList("s15")));
+    Assert.assertEquals(entry5.get(2), 
LineageEntryState.IN_PROGRESS.toString());
+    Assert.assertEquals(entry5.get(3), Long.toString(55555L));
+    Assert.assertEquals(entry5.get(4), Boolean.toString(true));
 
     // Test the conversion from the znRecord to the segment lineage
     SegmentLineage segmentLineageFromZNRecord = 
SegmentLineage.fromZNRecord(segmentLineage.toZNRecord());
@@ -104,6 +127,7 @@ public class SegmentLineageTest {
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id2), 
lineageEntry2);
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id3), 
lineageEntry3);
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id4), 
lineageEntry4);
+    Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id5), 
lineageEntry5);
 
     // Try to delete by iterating through the lineage entry ids
     for (String lineageId : segmentLineage.getLineageEntryIds()) {
@@ -134,5 +158,27 @@ public class SegmentLineageTest {
     actualLineageEntry =
         new LineageEntry(Arrays.asList("seg1"), Arrays.asList("seg3", "seg4"), 
LineageEntryState.REVERTED, 12345L);
     Assert.assertNotEquals(actualLineageEntry, expectedLineageEntry);
+
+    // Entries that differ only in autoCompleteLineageEntry must not be equal.
+    LineageEntry flaggedEntry =
+        new LineageEntry(Arrays.asList("seg1", "seg2"), Arrays.asList("seg3", 
"seg4"), LineageEntryState.IN_PROGRESS,
+            12345L, true);
+    Assert.assertNotEquals(flaggedEntry, expectedLineageEntry);
+  }
+
+  @Test
+  public void testLegacyFourTupleReadsAsAutoCompleteFalse() {
+    // A ZNRecord written by an older controller (4-tuple list fields) must 
still parse, with the
+    // missing 5th element defaulting to false. Hand-build the ZNRecord to 
simulate the legacy wire
+    // format directly.
+    ZNRecord legacy = new ZNRecord("test_OFFLINE");
+    legacy.setListField("legacy-1", Arrays.asList("a,b", "c", 
LineageEntryState.IN_PROGRESS.toString(), "99999"));
+    SegmentLineage lineage = SegmentLineage.fromZNRecord(legacy);
+    LineageEntry parsed = lineage.getLineageEntry("legacy-1");
+    Assert.assertEquals(parsed.getSegmentsFrom(), Arrays.asList("a", "b"));
+    Assert.assertEquals(parsed.getSegmentsTo(), Arrays.asList("c"));
+    Assert.assertEquals(parsed.getState(), LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(parsed.getTimestamp(), 99999L);
+    Assert.assertFalse(parsed.isAutoCompleteLineageEntry());
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ed1439afbed..e0175a146b7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -4639,7 +4639,8 @@ public class PinotHelixResourceManager {
                 } else {
                   // Update the lineage entry to 'REVERTED'
                   entry.setValue(new 
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
-                      LineageEntryState.REVERTED, System.currentTimeMillis()));
+                      LineageEntryState.REVERTED, System.currentTimeMillis(),
+                      lineageEntry.isAutoCompleteLineageEntry()));
                 }
 
                 // Add segments for proactive clean-up.
@@ -4816,7 +4817,7 @@ public class PinotHelixResourceManager {
         // Update lineage entry
         LineageEntry lineageEntryToUpdate =
             new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsTo, 
LineageEntryState.COMPLETED,
-                System.currentTimeMillis());
+                System.currentTimeMillis(), 
lineageEntry.isAutoCompleteLineageEntry());
 
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
         Map<String, String> customMap =
@@ -4958,7 +4959,7 @@ public class PinotHelixResourceManager {
         // Update segment lineage entry to 'REVERTED'
         LineageEntry lineageEntryToUpdate =
             new LineageEntry(lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
-                System.currentTimeMillis());
+                System.currentTimeMillis(), 
lineageEntry.isAutoCompleteLineageEntry());
 
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
         Map<String, String> customMap =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to