This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cc278d232cf Add auto complete flag to Lineage entry (#18584)
cc278d232cf is described below
commit cc278d232cfe1d0b2905a71f9be9ebe2a3fb58e7
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Jun 4 02:23:35 2026 +0530
Add auto complete flag to Lineage entry (#18584)
---
.../apache/pinot/common/lineage/LineageEntry.java | 22 +++++++++--
.../pinot/common/lineage/SegmentLineage.java | 15 +++++--
.../pinot/common/lineage/SegmentLineageTest.java | 46 ++++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 7 ++--
4 files changed, 81 insertions(+), 9 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
index e008b499db1..7a018b91fbd 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/LineageEntry.java
@@ -31,12 +31,24 @@ public class LineageEntry {
private final List<String> _segmentsTo;
private final LineageEntryState _state;
private final long _timestamp;
+ // Currently, lineage needs to be marked completed by clients be calling
{@code endReplaceSegments}.
+ // For few use cases, we can simplify the client logic by allowing an
observer (e.g. a controller-side periodic task)
+ // to mark lineage entries completed automatically once all the To segments
are ONLINE in the external view.
+ // This field allows us to opt in to this behavior on a per-entry basis.
+ // Default false preserves client-driven completion semantics.
+ private final boolean _autoCompleteLineageEntry;
public LineageEntry(List<String> segmentsFrom, List<String> segmentsTo,
LineageEntryState state, long timestamp) {
+ this(segmentsFrom, segmentsTo, state, timestamp, false);
+ }
+
+ public LineageEntry(List<String> segmentsFrom, List<String> segmentsTo,
LineageEntryState state, long timestamp,
+ boolean autoCompleteLineageEntry) {
_segmentsFrom = segmentsFrom;
_segmentsTo = segmentsTo;
_state = state;
_timestamp = timestamp;
+ _autoCompleteLineageEntry = autoCompleteLineageEntry;
}
public List<String> getSegmentsFrom() {
@@ -55,6 +67,10 @@ public class LineageEntry {
return _timestamp;
}
+ public boolean isAutoCompleteLineageEntry() {
+ return _autoCompleteLineageEntry;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -64,12 +80,12 @@ public class LineageEntry {
return false;
}
LineageEntry that = (LineageEntry) o;
- return _timestamp == that._timestamp &&
_segmentsFrom.equals(that._segmentsFrom) && _segmentsTo
- .equals(that._segmentsTo) && _state == that._state;
+ return _timestamp == that._timestamp && _autoCompleteLineageEntry ==
that._autoCompleteLineageEntry
+ && _segmentsFrom.equals(that._segmentsFrom) &&
_segmentsTo.equals(that._segmentsTo) && _state == that._state;
}
@Override
public int hashCode() {
- return Objects.hash(_segmentsFrom, _segmentsTo, _state, _timestamp);
+ return Objects.hash(_segmentsFrom, _segmentsTo, _state, _timestamp,
_autoCompleteLineageEntry);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
index b3776ddc9f2..ad5d533af45 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
@@ -149,12 +149,17 @@ public class SegmentLineage {
for (Map.Entry<String, List<String>> listField : listFields.entrySet()) {
String lineageId = listField.getKey();
List<String> value = listField.getValue();
- Preconditions.checkState(value.size() == 4);
+ // Tolerant read: legacy entries are 4-tuples. Newer entries append a
5th element
+ // ("autoCompleteLineageEntry") only when true; a missing 5th defaults
to false. The reader
+ // accepts both formats so older controllers/writers and newer ones can
interoperate.
+ Preconditions.checkState(value.size() >= 4);
List<String> segmentsFrom =
Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR));
List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1),
COMMA_SEPARATOR));
LineageEntryState state = LineageEntryState.valueOf(value.get(2));
long timestamp = Long.parseLong(value.get(3));
- lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo,
state, timestamp));
+ boolean autoCompleteLineageEntry = value.size() >= 5 &&
Boolean.parseBoolean(value.get(4));
+ lineageEntries.put(lineageId,
+ new LineageEntry(segmentsFrom, segmentsTo, state, timestamp,
autoCompleteLineageEntry));
}
return new SegmentLineage(tableNameWithType, lineageEntries, customMap);
}
@@ -171,7 +176,11 @@ public class SegmentLineage {
String segmentsTo = String.join(COMMA_SEPARATOR,
lineageEntry.getSegmentsTo());
String state = lineageEntry.getState().toString();
String timestamp = Long.toString(lineageEntry.getTimestamp());
- List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state,
timestamp);
+ // Omit the 5th element when the flag is the default (false): keeps the
wire format
+ // identical to legacy 4-tuples for every entry that does not opt into
observer-driven
+ // completion, so older readers continue to parse them.
+ List<String> listEntry = lineageEntry.isAutoCompleteLineageEntry() ?
Arrays.asList(segmentsFrom, segmentsTo,
+ state, timestamp, Boolean.toString(true)) :
Arrays.asList(segmentsFrom, segmentsTo, state, timestamp);
znRecord.setListField(entry.getKey(), listEntry);
}
if (_customMap != null) {
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
index 2c61962a3a5..996a216cd06 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
@@ -68,6 +68,20 @@ public class SegmentLineageTest {
Assert.assertEquals(lineageEntry4.getSegmentsTo(), Arrays.asList("s12"));
Assert.assertEquals(lineageEntry4.getState(),
LineageEntryState.IN_PROGRESS);
Assert.assertEquals(lineageEntry4.getTimestamp(), 44444L);
+ Assert.assertFalse(lineageEntry4.isAutoCompleteLineageEntry());
+
+ // Entry opting into observer-driven completion: should round-trip the
flag and emit a 5-tuple
+ // in the ZNRecord list field, while the other entries stay 4-tuples.
+ String id5 = SegmentLineageUtils.generateLineageEntryId();
+ segmentLineage.addLineageEntry(id5,
+ new LineageEntry(Arrays.asList("s13", "s14"), Arrays.asList("s15"),
LineageEntryState.IN_PROGRESS, 55555L,
+ true));
+ LineageEntry lineageEntry5 = segmentLineage.getLineageEntry(id5);
+ Assert.assertEquals(lineageEntry5.getSegmentsFrom(), Arrays.asList("s13",
"s14"));
+ Assert.assertEquals(lineageEntry5.getSegmentsTo(), Arrays.asList("s15"));
+ Assert.assertEquals(lineageEntry5.getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(lineageEntry5.getTimestamp(), 55555L);
+ Assert.assertTrue(lineageEntry5.isAutoCompleteLineageEntry());
// Test the convesion from the segment lineage to the znRecord
ZNRecord znRecord = segmentLineage.toZNRecord();
@@ -97,6 +111,15 @@ public class SegmentLineageTest {
Assert.assertEquals(entry4.get(1), String.join(",", Arrays.asList("s12")));
Assert.assertEquals(entry4.get(2),
LineageEntryState.IN_PROGRESS.toString());
Assert.assertEquals(entry4.get(3), Long.toString(44444L));
+ // Default-false entries must stay 4-tuples on the wire to keep old
readers happy.
+ Assert.assertEquals(entry4.size(), 4);
+
+ List<String> entry5 = listFields.get(id5);
+ Assert.assertEquals(entry5.get(0), String.join(",", Arrays.asList("s13",
"s14")));
+ Assert.assertEquals(entry5.get(1), String.join(",", Arrays.asList("s15")));
+ Assert.assertEquals(entry5.get(2),
LineageEntryState.IN_PROGRESS.toString());
+ Assert.assertEquals(entry5.get(3), Long.toString(55555L));
+ Assert.assertEquals(entry5.get(4), Boolean.toString(true));
// Test the conversion from the znRecord to the segment lineage
SegmentLineage segmentLineageFromZNRecord =
SegmentLineage.fromZNRecord(segmentLineage.toZNRecord());
@@ -104,6 +127,7 @@ public class SegmentLineageTest {
Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id2),
lineageEntry2);
Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id3),
lineageEntry3);
Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id4),
lineageEntry4);
+ Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id5),
lineageEntry5);
// Try to delete by iterating through the lineage entry ids
for (String lineageId : segmentLineage.getLineageEntryIds()) {
@@ -134,5 +158,27 @@ public class SegmentLineageTest {
actualLineageEntry =
new LineageEntry(Arrays.asList("seg1"), Arrays.asList("seg3", "seg4"),
LineageEntryState.REVERTED, 12345L);
Assert.assertNotEquals(actualLineageEntry, expectedLineageEntry);
+
+ // Entries that differ only in autoCompleteLineageEntry must not be equal.
+ LineageEntry flaggedEntry =
+ new LineageEntry(Arrays.asList("seg1", "seg2"), Arrays.asList("seg3",
"seg4"), LineageEntryState.IN_PROGRESS,
+ 12345L, true);
+ Assert.assertNotEquals(flaggedEntry, expectedLineageEntry);
+ }
+
+ @Test
+ public void testLegacyFourTupleReadsAsAutoCompleteFalse() {
+ // A ZNRecord written by an older controller (4-tuple list fields) must
still parse, with the
+ // missing 5th element defaulting to false. Hand-build the ZNRecord to
simulate the legacy wire
+ // format directly.
+ ZNRecord legacy = new ZNRecord("test_OFFLINE");
+ legacy.setListField("legacy-1", Arrays.asList("a,b", "c",
LineageEntryState.IN_PROGRESS.toString(), "99999"));
+ SegmentLineage lineage = SegmentLineage.fromZNRecord(legacy);
+ LineageEntry parsed = lineage.getLineageEntry("legacy-1");
+ Assert.assertEquals(parsed.getSegmentsFrom(), Arrays.asList("a", "b"));
+ Assert.assertEquals(parsed.getSegmentsTo(), Arrays.asList("c"));
+ Assert.assertEquals(parsed.getState(), LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(parsed.getTimestamp(), 99999L);
+ Assert.assertFalse(parsed.isAutoCompleteLineageEntry());
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ed1439afbed..e0175a146b7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -4639,7 +4639,8 @@ public class PinotHelixResourceManager {
} else {
// Update the lineage entry to 'REVERTED'
entry.setValue(new
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
- LineageEntryState.REVERTED, System.currentTimeMillis()));
+ LineageEntryState.REVERTED, System.currentTimeMillis(),
+ lineageEntry.isAutoCompleteLineageEntry()));
}
// Add segments for proactive clean-up.
@@ -4816,7 +4817,7 @@ public class PinotHelixResourceManager {
// Update lineage entry
LineageEntry lineageEntryToUpdate =
new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsTo,
LineageEntryState.COMPLETED,
- System.currentTimeMillis());
+ System.currentTimeMillis(),
lineageEntry.isAutoCompleteLineageEntry());
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Map<String, String> customMap =
@@ -4958,7 +4959,7 @@ public class PinotHelixResourceManager {
// Update segment lineage entry to 'REVERTED'
LineageEntry lineageEntryToUpdate =
new LineageEntry(lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
- System.currentTimeMillis());
+ System.currentTimeMillis(),
lineageEntry.isAutoCompleteLineageEntry());
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Map<String, String> customMap =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]