This is an automated email from the ASF dual-hosted git repository.
KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fdd8762953f Controller cleanups: SegmentDeletionManager fallback, LLC
filter, retention strategy extraction, mock util (#18582)
fdd8762953f is described below
commit fdd8762953f8b9ee6db94f5deafe0020f1de3d9f
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu May 28 17:39:43 2026 +0530
Controller cleanups: SegmentDeletionManager fallback, LLC filter, retention
strategy extraction, mock util (#18582)
---
.../helix/core/PinotHelixResourceManager.java | 14 +++-
.../helix/core/SegmentDeletionManager.java | 11 +++-
.../helix/core/retention/RetentionManager.java | 12 +---
.../core/retention/TableConfigRetentionUtils.java | 63 ++++++++++++++++++
...notHelixResourceManagerLastLLCSegmentsTest.java | 77 ++++++++++++++++++++++
.../helix/core/retention/RetentionManagerTest.java | 1 +
.../controller/utils/SegmentMetadataMockUtils.java | 14 ++++
7 files changed, 179 insertions(+), 13 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d5cb2b1d97f..909084468d4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1051,10 +1051,22 @@ public class PinotHelixResourceManager {
}
public Collection<String> getLastLLCCompletedSegments(String
tableNameWithType) {
+ return
getLastLLCCompletedSegments(getSegmentsZKMetadata(tableNameWithType));
+ }
+
+ /// Overload that operates on a caller-supplied list of {@link
SegmentZKMetadata}, avoiding a
+ /// redundant ZK fetch when the caller already holds the list (e.g. periodic
tasks that scan all
+ /// segments of a table and want to derive the last-completed LLC segment
per partition without
+ /// re-reading the property store).
+ public Collection<String> getLastLLCCompletedSegments(List<? extends
SegmentZKMetadata> segmentZKMetadataList) {
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new
HashMap<>();
- for (SegmentZKMetadata zkMetadata :
getSegmentsZKMetadata(tableNameWithType)) {
+ for (SegmentZKMetadata zkMetadata : segmentZKMetadataList) {
if (zkMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
LLCSegmentName llcName =
LLCSegmentName.of(zkMetadata.getSegmentName());
+ if (llcName == null) {
+ // llcName can be null if the segment is uploaded through offline
ingestion
+ continue;
+ }
int partitionGroupId = llcName.getPartitionGroupId();
int sequenceNumber = llcName.getSequenceNumber();
String lastCompletedSegName =
partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 8c4c7c4b484..3d5680fae37 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -235,9 +235,14 @@ public class SegmentDeletionManager {
for (int i = 0; i < deleteSuccessful.length; i++) {
final String segmentId = segmentsToDelete.get(i);
if (!deleteSuccessful[i]) {
- // remove API can fail because the prop store entry did not exist,
so check first.
- if (_propertyStore.exists(propStorePathList.get(i),
AccessOption.PERSISTENT)) {
- LOGGER.info("Could not delete {} from propertystore",
propStorePathList.get(i));
+ // The batch remove API takes a non-recursive ZK path: it cannot
delete a znode that has
+ // accumulated children. Fall back to the single-path remove API,
which falls back to a
+ // recursive delete on the same NotEmpty failure. Skip when the
znode is already gone
+ // (the batch call may have failed simply because the entry did not
exist).
+ String segmentPath = propStorePathList.get(i);
+ if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+ && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT))
{
+ LOGGER.info("Could not delete {} from propertystore", segmentPath);
segmentsToRetryLater.add(segmentId);
propStoreFailedSegs.add(segmentId);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 82cd4076128..9e6c3bbd8f6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -142,19 +142,13 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
LOGGER.info("Segment push type is not APPEND for table: {}, skip
managing retention", tableNameWithType);
return;
}
- String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
- String retentionTimeValue = validationConfig.getRetentionTimeValue();
int untrackedSegmentsDeletionBatchSize =
validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ?
Integer.parseInt(
validationConfig.getUntrackedSegmentsDeletionBatchSize()) :
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
- RetentionStrategy retentionStrategy;
- try {
- retentionStrategy = new
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
- Long.parseLong(retentionTimeValue),
_useCreationTimeFallbackForRetention);
- } catch (Exception e) {
- LOGGER.warn("Invalid retention time: {} {} for table: {}, skip",
retentionTimeUnit, retentionTimeValue,
- tableNameWithType);
+ RetentionStrategy retentionStrategy =
+ TableConfigRetentionUtils.buildRetentionStrategy(tableConfig,
_useCreationTimeFallbackForRetention);
+ if (retentionStrategy == null) {
return;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
new file mode 100644
index 00000000000..9df3bc81438
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/TableConfigRetentionUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.retention;
+
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
+import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Utility methods for deriving retention-related objects from a {@link
TableConfig}.
+public class TableConfigRetentionUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableConfigRetentionUtils.class);
+
+ private TableConfigRetentionUtils() {
+ }
+
+ /// Builds a {@link RetentionStrategy} from {@code tableConfig}, or returns
{@code null} when the
+ /// retention config is absent, empty, or malformed. A null return means no
retention is configured
+ /// and no segment should be treated as purgeable.
+ ///
+ /// @param useCreationTimeFallback when true, the strategy falls back to
segment creation time
+ /// when segment end time is unavailable
+ @Nullable
+ public static RetentionStrategy buildRetentionStrategy(TableConfig
tableConfig,
+ boolean useCreationTimeFallback) {
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ String unit = validationConfig.getRetentionTimeUnit();
+ String value = validationConfig.getRetentionTimeValue();
+ if (unit == null || unit.isEmpty() || value == null || value.isEmpty()) {
+ LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit,
value, tableConfig.getTableName());
+ return null;
+ }
+ try {
+ return new
TimeRetentionStrategy(TimeUnit.valueOf(unit.toUpperCase(Locale.ROOT)),
Long.parseLong(value),
+ useCreationTimeFallback);
+ } catch (Exception e) {
+ LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", unit,
value, tableConfig.getTableName());
+ return null;
+ }
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
new file mode 100644
index 00000000000..0feee5a6c25
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerLastLLCSegmentsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+public class PinotHelixResourceManagerLastLLCSegmentsTest {
+
+ private static final String TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+
+ /**
+ * A realtime table can contain non-LLC-named segments (e.g. uploaded via
batch ingestion) sitting in DONE state
+ * alongside the LLC-named consuming/committed segments. {@code
getLastLLCCompletedSegments} must skip those
+ * uploaded segments rather than NPE when {@code LLCSegmentName.of(name)}
returns {@code null}.
+ */
+ @Test
+ public void testGetLastLLCCompletedSegmentsSkipsNonLLCNamedSegments() {
+ long now = System.currentTimeMillis();
+ int partitionId = 3;
+
+ List<SegmentZKMetadata> segments = new ArrayList<>();
+ // Two LLC-named DONE segments — sequence 0 and 1 for the same partition;
sequence 1 is the latest.
+ LLCSegmentName seq0 = new LLCSegmentName(TABLE_NAME, partitionId, 0, now);
+ LLCSegmentName seq1 = new LLCSegmentName(TABLE_NAME, partitionId, 1, now);
+ segments.add(doneSegment(seq0.getSegmentName()));
+ segments.add(doneSegment(seq1.getSegmentName()));
+ // An uploaded (non-LLC-named) segment in DONE state — must be ignored,
not crash the method.
+ segments.add(doneSegment("uploaded_segment_0"));
+
+ PinotHelixResourceManager rm = mock(PinotHelixResourceManager.class);
+ when(rm.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segments);
+
when(rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+ when(rm.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();
+
+ Collection<String> lastCompleted =
rm.getLastLLCCompletedSegments(REALTIME_TABLE_NAME);
+ Set<String> actual = new HashSet<>(lastCompleted);
+ assertEquals(actual, Set.of(seq1.getSegmentName()));
+ }
+
+ private static SegmentZKMetadata doneSegment(String name) {
+ SegmentZKMetadata md = new SegmentZKMetadata(name);
+ md.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ return md;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 24fde5ae5b8..c32459615db 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -808,6 +808,7 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata);
when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME);
when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+
when(pinotHelixResourceManager.getLastLLCCompletedSegments(anyList())).thenCallRealMethod();
HelixAdmin helixAdmin = mock(HelixAdmin.class);
when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME,
REALTIME_TABLE_NAME)).thenReturn(idealState);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 99b5b23b7db..7a4f2c5d622 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -84,6 +84,20 @@ public class SegmentMetadataMockUtils {
return segmentZKMetadata;
}
+ public static SegmentMetadata mockSegmentMetadata(String tableName, String
segmentName, int numTotalDocs,
+ String crc, long startTime, long endTime, TimeUnit timeUnit, String
partitionColumn, int partitionId,
+ int numPartitions) {
+ SegmentMetadata segmentMetadata =
+ mockSegmentMetadata(tableName, segmentName, numTotalDocs, crc,
startTime, endTime, timeUnit);
+ ColumnMetadata colMeta = mock(ColumnMetadata.class);
+
when(colMeta.getPartitions()).thenReturn(Collections.singleton(partitionId));
+ when(colMeta.getPartitionFunction()).thenReturn(new
MurmurPartitionFunction(numPartitions, null));
+ TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
+ columnMetadataMap.put(partitionColumn, colMeta);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
+ return segmentMetadata;
+ }
+
public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String
rawTableName, String segmentName,
String columnName, int partitionNumber) {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]