xiangfu0 commented on code in PR #13735:
URL: https://github.com/apache/pinot/pull/13735#discussion_r1706016936
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3475,167 +3486,168 @@ public String startReplaceSegments(String
tableNameWithType, List<String> segmen
Preconditions.checkState(!segmentsForTable.contains(segment), "Segment:
%s from 'segmentsTo' exists in table: %s",
segment, tableNameWithType);
}
+ List<String> segmentsToCleanUp = new ArrayList<>();
+ synchronized (getLineageUpdaterLock(tableNameWithType)) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch table config
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
+
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ SegmentLineage segmentLineage;
+ int expectedVersion;
+ if (segmentLineageZNRecord == null) {
+ segmentLineage = new SegmentLineage(tableNameWithType);
+ expectedVersion = -1;
+ } else {
+ segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ expectedVersion = segmentLineageZNRecord.getVersion();
+ }
+ // Check that the segment lineage entry id doesn't exist in the
segment lineage
+
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
+ "Entry id: %s already exists in the segment lineage for table:
%s", segmentLineageEntryId,
+ tableNameWithType);
+
+ Iterator<Map.Entry<String, LineageEntry>> entryIterator =
+ segmentLineage.getLineageEntries().entrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Map.Entry<String, LineageEntry> entry = entryIterator.next();
+ String entryId = entry.getKey();
+ LineageEntry lineageEntry = entry.getValue();
+
+ // If the lineage entry is in 'REVERTED' state, no need to go
through the validation because we can regard
+ // the entry as not existing.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+ // When 'forceCleanup' is enabled, proactively clean up
'segmentsTo' since it's safe to do so.
+ if (forceCleanup) {
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ }
+ continue;
+ }
- try {
- DEFAULT_RETRY_POLICY.attempt(() -> {
- // Fetch table config
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
-
- // Fetch the segment lineage metadata
- ZNRecord segmentLineageZNRecord =
-
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
- SegmentLineage segmentLineage;
- int expectedVersion;
- if (segmentLineageZNRecord == null) {
- segmentLineage = new SegmentLineage(tableNameWithType);
- expectedVersion = -1;
- } else {
- segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
- expectedVersion = segmentLineageZNRecord.getVersion();
- }
- // Check that the segment lineage entry id doesn't exist in the
segment lineage
-
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
- "Entry id: %s already exists in the segment lineage for table:
%s", segmentLineageEntryId,
- tableNameWithType);
+ // By here, the lineage entry is either 'IN_PROGRESS' or
'COMPLETED'.
- List<String> segmentsToCleanUp = new ArrayList<>();
- Iterator<Map.Entry<String, LineageEntry>> entryIterator =
- segmentLineage.getLineageEntries().entrySet().iterator();
- while (entryIterator.hasNext()) {
- Map.Entry<String, LineageEntry> entry = entryIterator.next();
- String entryId = entry.getKey();
- LineageEntry lineageEntry = entry.getValue();
-
- // If the lineage entry is in 'REVERTED' state, no need to go
through the validation because we can regard
- // the entry as not existing.
- if (lineageEntry.getState() == LineageEntryState.REVERTED) {
- // When 'forceCleanup' is enabled, proactively clean up
'segmentsTo' since it's safe to do so.
+ // When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
+ // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentsFrom' or 'segmentsTo'
+ // values. This is used to un-block the segment replacement
protocol if the previous attempt failed in
+ // the middle.
+ // 2. Proactively delete the oldest data snapshot to make sure
that we only keep at most 2 data snapshots
+ // at any time in case of REFRESH use case.
if (forceCleanup) {
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
- }
- continue;
- }
+ if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
+ !Collections.disjoint(segmentsFrom,
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(
+ segmentsTo, lineageEntry.getSegmentsTo()))) {
+ LOGGER.info(
+ "Detected the incomplete lineage entry with overlapped
'segmentsFrom' or 'segmentsTo'. Deleting or "
+ + "reverting the lineage entry to unblock the new
segment protocol. tableNameWithType={}, "
+ + "entryId={}, segmentsFrom={}, segmentsTo={}",
tableNameWithType, entryId,
+ lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
+
+ // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+ // Delete or update segmentsTo of the entry to revert to
handle the case of rerunning the protocol:
+ // Initial state:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
+ // 1. Rerunning the protocol with s4 and s5, s4 should not be
deleted to avoid race conditions of
+ // concurrent data pushes and deletions:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3],
status: REVERTED}
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5],
status: IN_PROGRESS}
+ // 2. Rerunning the protocol with s3 and s4, we can simply
remove the 'IN_PROGRESS' entry:
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
+ List<String> segmentsToForEntryToRevert = new
ArrayList<>(lineageEntry.getSegmentsTo());
+ segmentsToForEntryToRevert.removeAll(segmentsTo);
+ if (segmentsToForEntryToRevert.isEmpty()) {
+ // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+ entryIterator.remove();
+ } else {
+ // Update the lineage entry to 'REVERTED'
+ entry.setValue(new
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
+ LineageEntryState.REVERTED, System.currentTimeMillis()));
+ }
- // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
-
- // When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
- // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentsFrom' or 'segmentsTo'
- // values. This is used to un-block the segment replacement
protocol if the previous attempt failed in the
- // middle.
- // 2. Proactively delete the oldest data snapshot to make sure that
we only keep at most 2 data snapshots
- // at any time in case of REFRESH use case.
- if (forceCleanup) {
- if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
- !Collections.disjoint(segmentsFrom,
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo,
- lineageEntry.getSegmentsTo()))) {
- LOGGER.info(
- "Detected the incomplete lineage entry with overlapped
'segmentsFrom' or 'segmentsTo'. Deleting or "
- + "reverting the lineage entry to unblock the new
segment protocol. tableNameWithType={}, "
- + "entryId={}, segmentsFrom={}, segmentsTo={}",
tableNameWithType, entryId,
- lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
-
- // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
- // Delete or update segmentsTo of the entry to revert to handle
the case of rerunning the protocol:
- // Initial state:
- // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
- // 1. Rerunning the protocol with s4 and s5, s4 should not be
deleted to avoid race conditions of
- // concurrent data pushes and deletions:
- // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status:
REVERTED}
- // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5],
status: IN_PROGRESS}
- // 2. Rerunning the protocol with s3 and s4, we can simply
remove the 'IN_PROGRESS' entry:
- // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
- List<String> segmentsToForEntryToRevert = new
ArrayList<>(lineageEntry.getSegmentsTo());
- segmentsToForEntryToRevert.removeAll(segmentsTo);
- if (segmentsToForEntryToRevert.isEmpty()) {
- // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
- entryIterator.remove();
- } else {
- // Update the lineage entry to 'REVERTED'
- entry.setValue(new
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
- LineageEntryState.REVERTED, System.currentTimeMillis()));
+ // Add segments for proactive clean-up.
+ segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
+ } else if (lineageEntry.getState() ==
LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase(
+
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) &&
CollectionUtils.isEqualCollection(
+ segmentsFrom, lineageEntry.getSegmentsTo())) {
+ // This part of code assumes that we only allow at most 2 data
snapshots at a time by proactively
+ // deleting the older snapshots (for REFRESH tables).
+ //
+ // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) //
previous lineage
+ // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) //
current lineage to be updated
+ // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want
to keep 2 data snapshots
+ // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to
avoid the disk space waste.
+ //
+ // TODO: make the number of allowed snapshots configurable to
allow users to keep at most N snapshots
+ // of data. We need to traverse the lineage by N steps
instead of 2 steps. We can build the
+ // reverse hash map (segmentsTo -> segmentsFrom) and
traverse up to N times before deleting.
+ LOGGER.info(
+ "Proactively deleting the replaced segments for REFRESH
table to avoid the excessive disk waste. "
+ + "tableNameWithType={}, segmentsToCleanUp={}",
tableNameWithType,
+ lineageEntry.getSegmentsFrom());
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
}
-
- // Add segments for proactive clean-up.
- segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
- } else if (lineageEntry.getState() == LineageEntryState.COMPLETED
&& "REFRESH".equalsIgnoreCase(
-
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) &&
CollectionUtils.isEqualCollection(
- segmentsFrom, lineageEntry.getSegmentsTo())) {
- // This part of code assumes that we only allow at most 2 data
snapshots at a time by proactively
- // deleting the older snapshots (for REFRESH tables).
- //
- // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) //
previous lineage
- // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) //
current lineage to be updated
- // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to
keep 2 data snapshots
- // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid
the disk space waste.
- //
- // TODO: make the number of allowed snapshots configurable to
allow users to keep at most N snapshots
- // of data. We need to traverse the lineage by N steps
instead of 2 steps. We can build the reverse
- // hash map (segmentsTo -> segmentsFrom) and traverse up
to N times before deleting.
- //
- LOGGER.info(
- "Proactively deleting the replaced segments for REFRESH
table to avoid the excessive disk waste. "
- + "tableNameWithType={}, segmentsToCleanUp={}",
tableNameWithType,
- lineageEntry.getSegmentsFrom());
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
- }
- } else {
- // Check that any segment from 'segmentsFrom' does not appear
twice.
- if (!segmentsFrom.isEmpty()) {
- Set<String> segmentsFromInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsFrom());
- if (!segmentsFromInLineageEntry.isEmpty()) {
- for (String segment : segmentsFrom) {
-
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
- "Segment: %s from 'segmentsFrom' exists in table: %s,
entry id: %s as 'segmentsFrom'"
- + " (replacing a replaced segment)", segment,
tableNameWithType, entryId);
+ } else {
+ // Check that any segment from 'segmentsFrom' does not appear
twice.
+ if (!segmentsFrom.isEmpty()) {
+ Set<String> segmentsFromInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsFrom());
+ if (!segmentsFromInLineageEntry.isEmpty()) {
+ for (String segment : segmentsFrom) {
+
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
+ "Segment: %s from 'segmentsFrom' exists in table: %s,
entry id: %s as 'segmentsFrom'"
+ + " (replacing a replaced segment)", segment,
tableNameWithType, entryId);
+ }
}
}
- }
- if (!segmentsTo.isEmpty()) {
- Set<String> segmentsToInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsTo());
- if (!segmentsToInLineageEntry.isEmpty()) {
- for (String segment : segmentsTo) {
-
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
- "Segment: %s from 'segmentsTo' exists in table: %s,
entry id: %s as 'segmentTo'"
- + " (name conflict)", segment, tableNameWithType,
entryId);
+ if (!segmentsTo.isEmpty()) {
+ Set<String> segmentsToInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsTo());
+ if (!segmentsToInLineageEntry.isEmpty()) {
+ for (String segment : segmentsTo) {
+
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
+ "Segment: %s from 'segmentsTo' exists in table: %s,
entry id: %s as 'segmentTo'"
+ + " (name conflict)", segment, tableNameWithType,
entryId);
+ }
}
}
}
}
- }
- // Update lineage entry
- segmentLineage.addLineageEntry(segmentLineageEntryId,
- new LineageEntry(segmentsFrom, segmentsTo,
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
-
- _lineageManager.updateLineageForStartReplaceSegments(tableConfig,
segmentLineageEntryId, customMap,
- segmentLineage);
- // Write back to the lineage entry to the property store
- if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
- // Trigger the proactive segment clean up if needed. Once the
lineage is updated in the property store, it
- // is safe to physically delete segments.
- if (!segmentsToCleanUp.isEmpty()) {
- LOGGER.info("Cleaning up the segments while startReplaceSegments:
{}", segmentsToCleanUp);
- deleteSegments(tableNameWithType, segmentsToCleanUp);
+ // Update lineage entry
+ segmentLineage.addLineageEntry(segmentLineageEntryId,
+ new LineageEntry(segmentsFrom, segmentsTo,
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+ _lineageManager.updateLineageForStartReplaceSegments(tableConfig,
segmentLineageEntryId, customMap,
+ segmentLineage);
+ // Write back to the lineage entry to the property store
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
+ return true;
+ } else {
+ LOGGER.warn("Failed to write segment lineage for table: {}",
tableNameWithType);
+ return false;
}
- return true;
- } else {
- LOGGER.warn("Failed to write segment lineage for table: {}",
tableNameWithType);
- return false;
- }
- });
- } catch (Exception e) {
- String errorMsg = String.format("Failed to update the segment lineage
during startReplaceSegments. "
- + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
tableNameWithType, segmentsFrom, segmentsTo);
- LOGGER.error(errorMsg, e);
- throw new RuntimeException(errorMsg, e);
+ });
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to update the segment lineage
during startReplaceSegments. "
+ + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
tableNameWithType, segmentsFrom, segmentsTo);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ // Trigger the proactive segment clean up if needed. Once the lineage is
updated in the property store, it
+ // is safe to physically delete segments.
+ if (!segmentsToCleanUp.isEmpty()) {
+ LOGGER.info("Cleaning up the segments while startReplaceSegments: {}",
segmentsToCleanUp);
+ deleteSegments(tableNameWithType, segmentsToCleanUp);
}
// Only successful attempt can reach here
- LOGGER.info("startReplaceSegments is successfully processed.
(tableNameWithType = {}, segmentsFrom = {}, "
- + "segmentsTo = {}, segmentLineageEntryId = {})",
tableNameWithType, segmentsFrom, segmentsTo,
- segmentLineageEntryId);
+ LOGGER.info("startReplaceSegments is successfully processed in {} ms.
(tableNameWithType = {}, segmentsFrom = {}, "
Review Comment:
Let's also log the number of attempts or even each attempt time?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3475,167 +3486,168 @@ public String startReplaceSegments(String
tableNameWithType, List<String> segmen
Preconditions.checkState(!segmentsForTable.contains(segment), "Segment:
%s from 'segmentsTo' exists in table: %s",
segment, tableNameWithType);
}
+ List<String> segmentsToCleanUp = new ArrayList<>();
+ synchronized (getLineageUpdaterLock(tableNameWithType)) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch table config
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
+
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ SegmentLineage segmentLineage;
+ int expectedVersion;
+ if (segmentLineageZNRecord == null) {
+ segmentLineage = new SegmentLineage(tableNameWithType);
+ expectedVersion = -1;
+ } else {
+ segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ expectedVersion = segmentLineageZNRecord.getVersion();
+ }
+ // Check that the segment lineage entry id doesn't exist in the
segment lineage
+
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
+ "Entry id: %s already exists in the segment lineage for table:
%s", segmentLineageEntryId,
+ tableNameWithType);
+
+ Iterator<Map.Entry<String, LineageEntry>> entryIterator =
+ segmentLineage.getLineageEntries().entrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Map.Entry<String, LineageEntry> entry = entryIterator.next();
+ String entryId = entry.getKey();
+ LineageEntry lineageEntry = entry.getValue();
+
+ // If the lineage entry is in 'REVERTED' state, no need to go
through the validation because we can regard
+ // the entry as not existing.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+ // When 'forceCleanup' is enabled, proactively clean up
'segmentsTo' since it's safe to do so.
+ if (forceCleanup) {
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ }
+ continue;
+ }
- try {
- DEFAULT_RETRY_POLICY.attempt(() -> {
- // Fetch table config
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
-
- // Fetch the segment lineage metadata
- ZNRecord segmentLineageZNRecord =
-
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
- SegmentLineage segmentLineage;
- int expectedVersion;
- if (segmentLineageZNRecord == null) {
- segmentLineage = new SegmentLineage(tableNameWithType);
- expectedVersion = -1;
- } else {
- segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
- expectedVersion = segmentLineageZNRecord.getVersion();
- }
- // Check that the segment lineage entry id doesn't exist in the
segment lineage
-
Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
- "Entry id: %s already exists in the segment lineage for table:
%s", segmentLineageEntryId,
- tableNameWithType);
+ // By here, the lineage entry is either 'IN_PROGRESS' or
'COMPLETED'.
- List<String> segmentsToCleanUp = new ArrayList<>();
- Iterator<Map.Entry<String, LineageEntry>> entryIterator =
- segmentLineage.getLineageEntries().entrySet().iterator();
- while (entryIterator.hasNext()) {
- Map.Entry<String, LineageEntry> entry = entryIterator.next();
- String entryId = entry.getKey();
- LineageEntry lineageEntry = entry.getValue();
-
- // If the lineage entry is in 'REVERTED' state, no need to go
through the validation because we can regard
- // the entry as not existing.
- if (lineageEntry.getState() == LineageEntryState.REVERTED) {
- // When 'forceCleanup' is enabled, proactively clean up
'segmentsTo' since it's safe to do so.
+ // When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
+ // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentsFrom' or 'segmentsTo'
+ // values. This is used to un-block the segment replacement
protocol if the previous attempt failed in
+ // the middle.
+ // 2. Proactively delete the oldest data snapshot to make sure
that we only keep at most 2 data snapshots
+ // at any time in case of REFRESH use case.
if (forceCleanup) {
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
- }
- continue;
- }
+ if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
+ !Collections.disjoint(segmentsFrom,
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(
+ segmentsTo, lineageEntry.getSegmentsTo()))) {
+ LOGGER.info(
+ "Detected the incomplete lineage entry with overlapped
'segmentsFrom' or 'segmentsTo'. Deleting or "
+ + "reverting the lineage entry to unblock the new
segment protocol. tableNameWithType={}, "
+ + "entryId={}, segmentsFrom={}, segmentsTo={}",
tableNameWithType, entryId,
+ lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
+
+ // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+ // Delete or update segmentsTo of the entry to revert to
handle the case of rerunning the protocol:
+ // Initial state:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
+ // 1. Rerunning the protocol with s4 and s5, s4 should not be
deleted to avoid race conditions of
+ // concurrent data pushes and deletions:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3],
status: REVERTED}
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5],
status: IN_PROGRESS}
+ // 2. Rerunning the protocol with s3 and s4, we can simply
remove the 'IN_PROGRESS' entry:
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
+ List<String> segmentsToForEntryToRevert = new
ArrayList<>(lineageEntry.getSegmentsTo());
+ segmentsToForEntryToRevert.removeAll(segmentsTo);
+ if (segmentsToForEntryToRevert.isEmpty()) {
+ // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+ entryIterator.remove();
+ } else {
+ // Update the lineage entry to 'REVERTED'
+ entry.setValue(new
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
+ LineageEntryState.REVERTED, System.currentTimeMillis()));
+ }
- // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
-
- // When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
- // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentsFrom' or 'segmentsTo'
- // values. This is used to un-block the segment replacement
protocol if the previous attempt failed in the
- // middle.
- // 2. Proactively delete the oldest data snapshot to make sure that
we only keep at most 2 data snapshots
- // at any time in case of REFRESH use case.
- if (forceCleanup) {
- if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (
- !Collections.disjoint(segmentsFrom,
lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo,
- lineageEntry.getSegmentsTo()))) {
- LOGGER.info(
- "Detected the incomplete lineage entry with overlapped
'segmentsFrom' or 'segmentsTo'. Deleting or "
- + "reverting the lineage entry to unblock the new
segment protocol. tableNameWithType={}, "
- + "entryId={}, segmentsFrom={}, segmentsTo={}",
tableNameWithType, entryId,
- lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
-
- // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
- // Delete or update segmentsTo of the entry to revert to handle
the case of rerunning the protocol:
- // Initial state:
- // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
- // 1. Rerunning the protocol with s4 and s5, s4 should not be
deleted to avoid race conditions of
- // concurrent data pushes and deletions:
- // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status:
REVERTED}
- // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5],
status: IN_PROGRESS}
- // 2. Rerunning the protocol with s3 and s4, we can simply
remove the 'IN_PROGRESS' entry:
- // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4],
status: IN_PROGRESS}
- List<String> segmentsToForEntryToRevert = new
ArrayList<>(lineageEntry.getSegmentsTo());
- segmentsToForEntryToRevert.removeAll(segmentsTo);
- if (segmentsToForEntryToRevert.isEmpty()) {
- // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
- entryIterator.remove();
- } else {
- // Update the lineage entry to 'REVERTED'
- entry.setValue(new
LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert,
- LineageEntryState.REVERTED, System.currentTimeMillis()));
+ // Add segments for proactive clean-up.
+ segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
+ } else if (lineageEntry.getState() ==
LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase(
+
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) &&
CollectionUtils.isEqualCollection(
+ segmentsFrom, lineageEntry.getSegmentsTo())) {
+ // This part of code assumes that we only allow at most 2 data
snapshots at a time by proactively
+ // deleting the older snapshots (for REFRESH tables).
+ //
+ // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) //
previous lineage
+ // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) //
current lineage to be updated
+ // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want
to keep 2 data snapshots
+ // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to
avoid the disk space waste.
+ //
+ // TODO: make the number of allowed snapshots configurable to
allow users to keep at most N snapshots
+ // of data. We need to traverse the lineage by N steps
instead of 2 steps. We can build the
+ // reverse hash map (segmentsTo -> segmentsFrom) and
traverse up to N times before deleting.
+ LOGGER.info(
+ "Proactively deleting the replaced segments for REFRESH
table to avoid the excessive disk waste. "
+ + "tableNameWithType={}, segmentsToCleanUp={}",
tableNameWithType,
+ lineageEntry.getSegmentsFrom());
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
}
-
- // Add segments for proactive clean-up.
- segmentsToCleanUp.addAll(segmentsToForEntryToRevert);
- } else if (lineageEntry.getState() == LineageEntryState.COMPLETED
&& "REFRESH".equalsIgnoreCase(
-
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) &&
CollectionUtils.isEqualCollection(
- segmentsFrom, lineageEntry.getSegmentsTo())) {
- // This part of code assumes that we only allow at most 2 data
snapshots at a time by proactively
- // deleting the older snapshots (for REFRESH tables).
- //
- // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) //
previous lineage
- // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) //
current lineage to be updated
- // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to
keep 2 data snapshots
- // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid
the disk space waste.
- //
- // TODO: make the number of allowed snapshots configurable to
allow users to keep at most N snapshots
- // of data. We need to traverse the lineage by N steps
instead of 2 steps. We can build the reverse
- // hash map (segmentsTo -> segmentsFrom) and traverse up
to N times before deleting.
- //
- LOGGER.info(
- "Proactively deleting the replaced segments for REFRESH
table to avoid the excessive disk waste. "
- + "tableNameWithType={}, segmentsToCleanUp={}",
tableNameWithType,
- lineageEntry.getSegmentsFrom());
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
- }
- } else {
- // Check that any segment from 'segmentsFrom' does not appear
twice.
- if (!segmentsFrom.isEmpty()) {
- Set<String> segmentsFromInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsFrom());
- if (!segmentsFromInLineageEntry.isEmpty()) {
- for (String segment : segmentsFrom) {
-
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
- "Segment: %s from 'segmentsFrom' exists in table: %s,
entry id: %s as 'segmentsFrom'"
- + " (replacing a replaced segment)", segment,
tableNameWithType, entryId);
+ } else {
+ // Check that any segment from 'segmentsFrom' does not appear
twice.
+ if (!segmentsFrom.isEmpty()) {
+ Set<String> segmentsFromInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsFrom());
+ if (!segmentsFromInLineageEntry.isEmpty()) {
+ for (String segment : segmentsFrom) {
+
Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment),
+ "Segment: %s from 'segmentsFrom' exists in table: %s,
entry id: %s as 'segmentsFrom'"
+ + " (replacing a replaced segment)", segment,
tableNameWithType, entryId);
+ }
}
}
- }
- if (!segmentsTo.isEmpty()) {
- Set<String> segmentsToInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsTo());
- if (!segmentsToInLineageEntry.isEmpty()) {
- for (String segment : segmentsTo) {
-
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
- "Segment: %s from 'segmentsTo' exists in table: %s,
entry id: %s as 'segmentTo'"
- + " (name conflict)", segment, tableNameWithType,
entryId);
+ if (!segmentsTo.isEmpty()) {
+ Set<String> segmentsToInLineageEntry = new
HashSet<>(lineageEntry.getSegmentsTo());
+ if (!segmentsToInLineageEntry.isEmpty()) {
+ for (String segment : segmentsTo) {
+
Preconditions.checkState(!segmentsToInLineageEntry.contains(segment),
+ "Segment: %s from 'segmentsTo' exists in table: %s,
entry id: %s as 'segmentTo'"
+ + " (name conflict)", segment, tableNameWithType,
entryId);
+ }
}
}
}
}
- }
- // Update lineage entry
- segmentLineage.addLineageEntry(segmentLineageEntryId,
- new LineageEntry(segmentsFrom, segmentsTo,
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
-
- _lineageManager.updateLineageForStartReplaceSegments(tableConfig,
segmentLineageEntryId, customMap,
- segmentLineage);
- // Write back to the lineage entry to the property store
- if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
- // Trigger the proactive segment clean up if needed. Once the
lineage is updated in the property store, it
- // is safe to physically delete segments.
- if (!segmentsToCleanUp.isEmpty()) {
- LOGGER.info("Cleaning up the segments while startReplaceSegments:
{}", segmentsToCleanUp);
- deleteSegments(tableNameWithType, segmentsToCleanUp);
+ // Update lineage entry
+ segmentLineage.addLineageEntry(segmentLineageEntryId,
+ new LineageEntry(segmentsFrom, segmentsTo,
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+ _lineageManager.updateLineageForStartReplaceSegments(tableConfig,
segmentLineageEntryId, customMap,
+ segmentLineage);
+ // Write back to the lineage entry to the property store
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
Review Comment:
Can we log the time for every lineage processing/compute to get a sense of
internal lineage reprocessing time?
If it takes more time, then we should think of introducing a table level
distributed lock.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String
tableNameWithType, String segmentLineag
* @param lineageUpdateType
* @param customMap
*/
- private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig,
String lineageEntryId,
+ private boolean writeLineageEntryWithLock(TableConfig tableConfig, String
lineageEntryId,
LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch,
ZkHelixPropertyStore<ZNRecord> propertyStore,
LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
- for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
- // Fetch the segment lineage
- ZNRecord segmentLineageToUpdateZNRecord =
- SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore,
tableConfig.getTableName());
- int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
- SegmentLineage segmentLineageToUpdate =
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
- LineageEntry currentLineageEntry =
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
-
- // If the lineage entry doesn't match with the previously fetched
lineage, we need to fail the request.
- if (!currentLineageEntry.equals(lineageEntryToMatch)) {
- String errorMsg = String.format(
- "Aborting the to update lineage entry since we find that the entry
has been modified for table %s, "
- + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
- LOGGER.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
+ String tableNameWithType = tableConfig.getTableName();
+ synchronized (getLineageUpdaterLock(tableNameWithType)) {
+ // Fetch the segment lineage
+ ZNRecord segmentLineageToUpdateZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore,
tableConfig.getTableName());
+ int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
+ SegmentLineage segmentLineageToUpdate =
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
+ LineageEntry currentLineageEntry =
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
+
+ // If the lineage entry doesn't match with the previously fetched
lineage, we need to fail the request.
+ if (!currentLineageEntry.equals(lineageEntryToMatch)) {
+ String errorMsg = String.format(
+ "Aborting the to update lineage entry since we find that the
entry has been modified for table %s, "
+ + "entry id: %s", tableConfig.getTableName(),
lineageEntryId);
+ LOGGER.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
- // Update lineage entry
- segmentLineageToUpdate.updateLineageEntry(lineageEntryId,
lineageEntryToUpdate);
- switch (lineageUpdateType) {
- case START:
- _lineageManager.updateLineageForStartReplaceSegments(tableConfig,
lineageEntryId, customMap,
- segmentLineageToUpdate);
- break;
- case END:
- _lineageManager.updateLineageForEndReplaceSegments(tableConfig,
lineageEntryId, customMap,
- segmentLineageToUpdate);
- break;
- case REVERT:
- _lineageManager.updateLineageForRevertReplaceSegments(tableConfig,
lineageEntryId, customMap,
- segmentLineageToUpdate);
- break;
- default:
- }
+ // Update lineage entry
+ segmentLineageToUpdate.updateLineageEntry(lineageEntryId,
lineageEntryToUpdate);
+ switch (lineageUpdateType) {
+ case START:
Review Comment:
I think this `case START` won't be hit?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -191,7 +191,8 @@ public class PinotHelixResourceManager {
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 10;
Review Comment:
DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY is not used anymore.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String
tableNameWithType, String segmentLineag
* @param lineageUpdateType
* @param customMap
*/
- private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig,
String lineageEntryId,
+ private boolean writeLineageEntryWithLock(TableConfig tableConfig, String
lineageEntryId,
LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch,
ZkHelixPropertyStore<ZNRecord> propertyStore,
LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
- for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
- // Fetch the segment lineage
- ZNRecord segmentLineageToUpdateZNRecord =
- SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore,
tableConfig.getTableName());
- int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
- SegmentLineage segmentLineageToUpdate =
SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
- LineageEntry currentLineageEntry =
segmentLineageToUpdate.getLineageEntry(lineageEntryId);
-
- // If the lineage entry doesn't match with the previously fetched
lineage, we need to fail the request.
- if (!currentLineageEntry.equals(lineageEntryToMatch)) {
- String errorMsg = String.format(
- "Aborting the to update lineage entry since we find that the entry
has been modified for table %s, "
- + "entry id: %s", tableConfig.getTableName(), lineageEntryId);
- LOGGER.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
+ String tableNameWithType = tableConfig.getTableName();
Review Comment:
Considering the distributed update from other controllers, I feel it's still
better to put a smaller number of retries here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]