This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b918196a222 Add segment-level failure details capture to reload status
tracking (#17234)
b918196a222 is described below
commit b918196a222a4383d8b59d7e81e392f7c8ff6236
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Sat Nov 22 10:45:32 2025 -0800
Add segment-level failure details capture to reload status tracking (#17234)
---
.gitignore | 4 +-
.../common/response/server/ApiErrorResponse.java | 35 ++-
.../server/SegmentReloadFailureResponse.java | 69 +++++
.../server/ServerReloadStatusResponse.java | 52 +++-
.../api/dto/PinotTableReloadStatusResponse.java | 17 ++
.../services/PinotTableReloadStatusReporter.java | 35 ++-
.../core/data/manager/BaseTableDataManager.java | 2 +-
.../manager/provider/TableDataManagerProvider.java | 2 +-
.../perf/BenchmarkDimensionTableOverhead.java | 2 +-
.../pinot/segment/local/utils/ReloadJobStatus.java | 19 +-
.../local/utils/ServerReloadJobStatusCache.java | 44 +++-
.../utils/ServerReloadJobStatusCacheConfig.java | 15 +-
.../utils/ServerReloadJobStatusCacheTest.java | 285 ++++++++++++++++-----
.../api/resources/ControllerJobStatusResource.java | 25 +-
.../server/starter/helix/BaseServerStarter.java | 2 +-
15 files changed, 480 insertions(+), 128 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5fb3db21a06..1b3de1a0f5a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,6 @@ kubernetes/helm/**/Chart.lock
.mvn/.develocity/
pinot-integration-tests/src/test/resources/udf-test-results/*.md
-!/pinot-integration-tests/src/test/resources/udf-test-results/README.md
\ No newline at end of file
+!/pinot-integration-tests/src/test/resources/udf-test-results/README.md
+
+tmp/
\ No newline at end of file
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
similarity index 53%
copy from
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
copy to
pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
index 833f96d5507..37456f9dfe8 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
@@ -16,33 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pinot.common.response.server;
-package org.apache.pinot.server.starter.helix;
+import org.apache.pinot.spi.annotations.InterfaceStability;
-public class SegmentReloadStatusValue {
- private final long _totalSegmentCount;
- private final long _successCount;
- private final Long _failureCount;
- public SegmentReloadStatusValue(long totalSegmentCount, long successCount) {
- this(totalSegmentCount, successCount, null);
- }
[email protected]
+public class ApiErrorResponse {
+
+ private String _errorMsg;
+ private String _stacktrace;
- public SegmentReloadStatusValue(long totalSegmentCount, long successCount,
Long failureCount) {
- _totalSegmentCount = totalSegmentCount;
- _successCount = successCount;
- _failureCount = failureCount;
+ public String getErrorMsg() {
+ return _errorMsg;
}
- public long getTotalSegmentCount() {
- return _totalSegmentCount;
+ public ApiErrorResponse setErrorMsg(String errorMsg) {
+ _errorMsg = errorMsg;
+ return this;
}
- public long getSuccessCount() {
- return _successCount;
+ public String getStacktrace() {
+ return _stacktrace;
}
- public Long getFailureCount() {
- return _failureCount;
+ public ApiErrorResponse setStacktrace(String stacktrace) {
+ _stacktrace = stacktrace;
+ return this;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
new file mode 100644
index 00000000000..442d3da65f0
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.server;
+
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+
+/**
+ * DTO representing a single segment reload failure.
+ */
[email protected]
+public class SegmentReloadFailureResponse {
+ private String _segmentName;
+ private String _serverName;
+ private ApiErrorResponse _error;
+ private long _failedAtMs;
+
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public SegmentReloadFailureResponse setSegmentName(String segmentName) {
+ _segmentName = segmentName;
+ return this;
+ }
+
+ public String getServerName() {
+ return _serverName;
+ }
+
+ public SegmentReloadFailureResponse setServerName(String serverName) {
+ _serverName = serverName;
+ return this;
+ }
+
+ public ApiErrorResponse getError() {
+ return _error;
+ }
+
+ public SegmentReloadFailureResponse setError(ApiErrorResponse error) {
+ _error = error;
+ return this;
+ }
+
+ public long getFailedAtMs() {
+ return _failedAtMs;
+ }
+
+ public SegmentReloadFailureResponse setFailedAtMs(long failedAtMs) {
+ _failedAtMs = failedAtMs;
+ return this;
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
similarity index 50%
rename from
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
index 833f96d5507..70516dd4589 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
@@ -17,32 +17,56 @@
* under the License.
*/
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.common.response.server;
-public class SegmentReloadStatusValue {
- private final long _totalSegmentCount;
- private final long _successCount;
- private final Long _failureCount;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.InterfaceStability;
- public SegmentReloadStatusValue(long totalSegmentCount, long successCount) {
- this(totalSegmentCount, successCount, null);
- }
- public SegmentReloadStatusValue(long totalSegmentCount, long successCount,
Long failureCount) {
- _totalSegmentCount = totalSegmentCount;
- _successCount = successCount;
- _failureCount = failureCount;
- }
[email protected]
+public class ServerReloadStatusResponse {
+ private long _totalSegmentCount;
+ private int _successCount;
+ private Long _failureCount;
+ private List<SegmentReloadFailureResponse> _segmentReloadFailures;
public long getTotalSegmentCount() {
return _totalSegmentCount;
}
- public long getSuccessCount() {
+ public ServerReloadStatusResponse setTotalSegmentCount(long
totalSegmentCount) {
+ _totalSegmentCount = totalSegmentCount;
+ return this;
+ }
+
+ public int getSuccessCount() {
return _successCount;
}
+ public ServerReloadStatusResponse setSuccessCount(int successCount) {
+ _successCount = successCount;
+ return this;
+ }
+
+ @Nullable
public Long getFailureCount() {
return _failureCount;
}
+
+ public ServerReloadStatusResponse setFailureCount(Long failureCount) {
+ _failureCount = failureCount;
+ return this;
+ }
+
+ @Nullable
+ public List<SegmentReloadFailureResponse> getSampleSegmentReloadFailures() {
+ return _segmentReloadFailures;
+ }
+
+ public ServerReloadStatusResponse setSampleSegmentReloadFailures(
+ List<SegmentReloadFailureResponse> sampleSegmentReloadFailureResponses) {
+ _segmentReloadFailures = sampleSegmentReloadFailureResponses;
+ return this;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
index 828a75aed63..f56f2b2f781 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
@@ -18,6 +18,12 @@
*/
package org.apache.pinot.controller.api.dto;
+import java.util.List;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+
[email protected]
public class PinotTableReloadStatusResponse {
private double _timeElapsedInMinutes;
private double _estimatedTimeRemainingInMinutes;
@@ -27,6 +33,7 @@ public class PinotTableReloadStatusResponse {
private int _totalServerCallsFailed;
private Long _failureCount;
private PinotControllerJobMetadataDto _metadata;
+ private List<SegmentReloadFailureResponse> _segmentReloadFailures;
public int getTotalSegmentCount() {
return _totalSegmentCount;
@@ -101,4 +108,14 @@ public class PinotTableReloadStatusResponse {
_metadata = metadata;
return this;
}
+
+ public List<SegmentReloadFailureResponse> getSegmentReloadFailures() {
+ return _segmentReloadFailures;
+ }
+
+ public PinotTableReloadStatusResponse setSegmentReloadFailures(
+ List<SegmentReloadFailureResponse> segmentReloadFailures) {
+ _segmentReloadFailures = segmentReloadFailures;
+ return this;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
index a1873b1d20b..6e76c96b792 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -35,6 +35,8 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
+import org.apache.pinot.common.response.server.ServerReloadStatusResponse;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.api.dto.PinotControllerJobMetadataDto;
import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
@@ -155,19 +157,29 @@ public class PinotTableReloadStatusReporter {
long totalFailureCount = 0;
boolean hasFailureCountData = false;
+ List<SegmentReloadFailureResponse> allFailedSegments = new ArrayList<>();
+ // Single iteration to aggregate counts and collect failed segments
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
String responseString = streamResponse.getValue();
try {
- PinotTableReloadStatusResponse r =
- JsonUtils.stringToObject(responseString,
PinotTableReloadStatusResponse.class);
- response.setSuccessCount(response.getSuccessCount() +
r.getSuccessCount());
+ ServerReloadStatusResponse serverResponse =
+ JsonUtils.stringToObject(responseString,
ServerReloadStatusResponse.class);
+
+ // Aggregate success count
+ response.setSuccessCount(response.getSuccessCount() +
serverResponse.getSuccessCount());
// Aggregate failure counts if available
- if (r.getFailureCount() != null) {
- totalFailureCount += r.getFailureCount();
+ if (serverResponse.getFailureCount() != null) {
+ totalFailureCount += serverResponse.getFailureCount();
hasFailureCountData = true;
}
+
+ // Collect failed segments
+ if (serverResponse.getSampleSegmentReloadFailures() != null
+ && !serverResponse.getSampleSegmentReloadFailures().isEmpty()) {
+
allFailedSegments.addAll(serverResponse.getSampleSegmentReloadFailures());
+ }
} catch (Exception e) {
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
}
@@ -178,6 +190,19 @@ public class PinotTableReloadStatusReporter {
response.setFailureCount(totalFailureCount);
}
+ // Set failed segments in response if any were collected
+ if (!allFailedSegments.isEmpty()) {
+ // Limit to prevent huge responses (e.g., max 500 failures across all
servers)
+ // This limit is higher than per-server limit since we're aggregating
+ int maxFailuresInResponse = 500;
+ if (allFailedSegments.size() > maxFailuresInResponse) {
+ LOG.warn("Truncating failed segments list from {} to {} for job {}",
+ allFailedSegments.size(), maxFailuresInResponse, reloadJobId);
+ allFailedSegments = allFailedSegments.subList(0,
maxFailuresInResponse);
+ }
+ response.setSegmentReloadFailures(allFailedSegments);
+ }
+
// Add derived fields
final double timeElapsedInMinutes =
computeTimeElapsedInMinutes(reloadJobMetadata.getSubmissionTimeMs());
final double estimatedRemainingTimeInMinutes =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 17d2db31bd6..9c96cfdef62 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -801,7 +801,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
failedSegments.add(segmentName);
sampleException.set(t);
if (reloadJobId != null) {
-
_reloadJobStatusCache.getOrCreate(reloadJobId).incrementAndGetFailureCount();
+ _reloadJobStatusCache.recordFailure(reloadJobId, segmentName, t);
}
}
},
_segmentReloadRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index aaddb8a1a42..7a8f86121ed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -63,6 +63,6 @@ public interface TableDataManagerProvider {
@VisibleForTesting
default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
return getTableDataManager(tableConfig, schema, new
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
- null, null, () -> true, false, new ServerReloadJobStatusCache());
+ null, null, () -> true, false, new
ServerReloadJobStatusCache("testInstance"));
}
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 5dc8a24439f..9619023ba3b 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -198,7 +198,7 @@ public class BenchmarkDimensionTableOverhead extends
BaseQueriesTest {
null,
SEGMENT_OPERATIONS_THROTTLER,
false,
- new ServerReloadJobStatusCache());
+ new ServerReloadJobStatusCache("benchmarkInstance"));
_tableDataManager.start();
for (int i = 0; i < _indexSegments.size(); i++) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
index fc39147d288..5f8d3c33ea0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
@@ -18,21 +18,25 @@
*/
package org.apache.pinot.segment.local.utils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
/**
* Tracks status of a reload job.
- * Phase 1: Only tracks failure count.
+ * Thread-safe for concurrent access.
*/
public class ReloadJobStatus {
private final String _jobId;
- private final AtomicInteger _failureCount;
+ private final AtomicInteger _failureCount = new AtomicInteger(0);
private final long _createdTimeMs;
+ private final List<SegmentReloadFailureResponse> _failedSegmentDetails = new
ArrayList<>();
public ReloadJobStatus(String jobId) {
_jobId = jobId;
- _failureCount = new AtomicInteger(0);
_createdTimeMs = System.currentTimeMillis();
}
@@ -52,9 +56,18 @@ public class ReloadJobStatus {
return _createdTimeMs;
}
+ public synchronized List<SegmentReloadFailureResponse>
getFailedSegmentDetails() {
+ return Collections.unmodifiableList(_failedSegmentDetails);
+ }
+
+ synchronized void addFailureDetail(SegmentReloadFailureResponse
failureResponse) {
+ _failedSegmentDetails.add(failureResponse);
+ }
+
@Override
public String toString() {
return "ReloadJobStatus{jobId='" + _jobId + "', failureCount=" +
_failureCount.get()
+ + ", failedSegmentDetailsCount=" + _failedSegmentDetails.size()
+ ", createdTimeMs=" + _createdTimeMs + '}';
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
index 6c576dca1d3..840aed04934 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration2.MapConfiguration;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.response.server.ApiErrorResponse;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
@@ -38,7 +41,6 @@ import static java.util.Objects.requireNonNull;
/**
* In-memory cache for tracking reload job status on server side.
- * Phase 1: Only tracks failure count per job.
*
* <p>Thread-safe for concurrent access. Uses Guava Cache with LRU eviction
* and time-based expiration.
@@ -53,10 +55,12 @@ public class ServerReloadJobStatusCache implements
PinotClusterConfigChangeListe
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static final String CONFIG_PREFIX = "pinot.server.table.reload.status.cache";
+ private final String _instanceId;
private volatile Cache<String, ReloadJobStatus> _cache;
private volatile ServerReloadJobStatusCacheConfig _currentConfig;
- public ServerReloadJobStatusCache() {
+ public ServerReloadJobStatusCache(String instanceId) {
+ _instanceId = requireNonNull(instanceId, "instanceId cannot be null");
_currentConfig = new ServerReloadJobStatusCacheConfig();
_cache = CacheBuilder.newBuilder()
.maximumSize(_currentConfig.getMaxSize())
@@ -64,7 +68,7 @@ public class ServerReloadJobStatusCache implements
PinotClusterConfigChangeListe
.recordStats()
.build();
- LOG.info("Initialized ReloadJobStatusCache with {}", _currentConfig);
+ LOG.info("Initialized ReloadJobStatusCache for instance {} with {}",
_instanceId, _currentConfig);
}
/**
@@ -103,6 +107,40 @@ public class ServerReloadJobStatusCache implements
PinotClusterConfigChangeListe
return status;
}
+ /**
+ * Records a segment reload failure in the cache.
+ * Handles all business logic: counting, limit enforcement, thread safety.
+ *
+ * <p>This method ALWAYS increments the failure count, but only stores
detailed
+ * failure information (segment name, exception, stack trace) for the first
N failures
+ * where N is configured by maxFailureDetailsToCapture.
+ *
+ * @param jobId reload job ID (UUID)
+ * @param segmentName name of failed segment
+ * @param exception the exception that caused the failure
+ */
+ public void recordFailure(String jobId, String segmentName, Throwable
exception) {
+ requireNonNull(jobId, "jobId cannot be null");
+ requireNonNull(segmentName, "segmentName cannot be null");
+ requireNonNull(exception, "exception cannot be null");
+
+ ReloadJobStatus status = getOrCreate(jobId);
+ status.incrementAndGetFailureCount();
+
+ synchronized (status) {
+ int maxLimit = _currentConfig.getSegmentFailureDetailsCount();
+ if (status.getFailedSegmentDetails().size() < maxLimit) {
+ status.addFailureDetail(new SegmentReloadFailureResponse()
+ .setSegmentName(segmentName)
+ .setServerName(_instanceId)
+ .setError(new ApiErrorResponse()
+ .setErrorMsg(exception.getMessage())
+ .setStacktrace(ExceptionUtils.getStackTrace(exception)))
+ .setFailedAtMs(System.currentTimeMillis()));
+ }
+ }
+ }
+
/**
* Rebuilds the cache with new configuration and migrates existing entries.
* This method is synchronized to prevent concurrent rebuilds.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
index 304e2221b17..8d113fbdf28 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
@@ -34,6 +34,9 @@ public class ServerReloadJobStatusCacheConfig {
@JsonProperty("ttl.days")
private int _ttlDays = 30;
+ @JsonProperty("segment.failure.details.count")
+ private int _segmentFailureDetailsCount = 5;
+
public int getMaxSize() {
return _maxSize;
}
@@ -52,8 +55,18 @@ public class ServerReloadJobStatusCacheConfig {
return this;
}
+ public int getSegmentFailureDetailsCount() {
+ return _segmentFailureDetailsCount;
+ }
+
+ public ServerReloadJobStatusCacheConfig setSegmentFailureDetailsCount(int
segmentFailureDetailsCount) {
+ _segmentFailureDetailsCount = segmentFailureDetailsCount;
+ return this;
+ }
+
@Override
public String toString() {
- return "ServerReloadJobStatusCacheConfig{maxSize=" + _maxSize + ",
ttlDays=" + _ttlDays + '}';
+ return "ServerReloadJobStatusCacheConfig{maxSize=" + _maxSize + ",
ttlDays=" + _ttlDays
+ + ", segmentFailureDetailsCount=" + _segmentFailureDetailsCount + '}';
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
index 1d18ece7489..b459cbfb44a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
@@ -18,9 +18,17 @@
*/
package org.apache.pinot.segment.local.utils;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -35,7 +43,7 @@ public class ServerReloadJobStatusCacheTest {
@Test
public void testDefaultConfigInitialization() {
// Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// Then
ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
@@ -52,7 +60,7 @@ public class ServerReloadJobStatusCacheTest {
properties.put("pinot.server.table.reload.status.cache.ttl.days", "15");
properties.put("some.other.config", "value");
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// When
cache.onChange(properties.keySet(), properties);
@@ -70,7 +78,7 @@ public class ServerReloadJobStatusCacheTest {
properties.put("pinot.server.table.reload.status.cache.size.max", "7500");
properties.put("some.other.config", "value");
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// When
cache.onChange(properties.keySet(), properties);
@@ -89,7 +97,7 @@ public class ServerReloadJobStatusCacheTest {
properties.put("some.other.config", "value");
properties.put("another.config", "123");
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// When
cache.onChange(properties.keySet(), properties);
@@ -106,7 +114,7 @@ public class ServerReloadJobStatusCacheTest {
Map<String, String> properties = new HashMap<>();
properties.put("pinot.server.table.reload.status.cache.size.max",
"invalid");
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
ServerReloadJobStatusCacheConfig oldConfig = cache.getCurrentConfig();
// When - Invalid config should keep old cache
@@ -118,34 +126,10 @@ public class ServerReloadJobStatusCacheTest {
assertThat(config.getMaxSize()).isEqualTo(10000);
}
- @Test
- public void testConfigUpdateOverwritesPrevious() {
- // Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
-
- // Set initial config
- Map<String, String> initialProperties = new HashMap<>();
- initialProperties.put("pinot.server.table.reload.status.cache.size.max",
"8000");
- initialProperties.put("pinot.server.table.reload.status.cache.ttl.days",
"20");
- cache.onChange(initialProperties.keySet(), initialProperties);
- assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(8000);
- assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(20);
-
- // When - Update with new config
- Map<String, String> updatedProperties = new HashMap<>();
- updatedProperties.put("pinot.server.table.reload.status.cache.size.max",
"12000");
- updatedProperties.put("pinot.server.table.reload.status.cache.ttl.days",
"45");
- cache.onChange(updatedProperties.keySet(), updatedProperties);
-
- // Then
- assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(12000);
- assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(45);
- }
-
@Test
public void testZookeeperConfigDeletionRevertsToDefaults() {
// Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// Set initial custom configs
Map<String, String> customProperties = new HashMap<>();
@@ -186,7 +170,7 @@ public class ServerReloadJobStatusCacheTest {
@Test
public void testCacheEntryMigrationOnRebuild() {
// Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
// Add some entries to cache
ReloadJobStatus status1 = cache.getOrCreate("job-1");
@@ -216,44 +200,10 @@ public class ServerReloadJobStatusCacheTest {
assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(15);
}
- @Test
- public void testCacheRebuildWithDifferentSize() {
- // Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
- assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(10000);
-
- // When - Update only max size
- Map<String, String> properties = new HashMap<>();
- properties.put("pinot.server.table.reload.status.cache.size.max", "20000");
- cache.onChange(properties.keySet(), properties);
-
- // Then - Verify new size takes effect
- assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(20000);
- // TTL should remain default
- assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(30);
- }
-
- @Test
- public void testCacheRebuildWithDifferentTTL() {
- // Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
- assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(30);
-
- // When - Update only TTL
- Map<String, String> properties = new HashMap<>();
- properties.put("pinot.server.table.reload.status.cache.ttl.days", "45");
- cache.onChange(properties.keySet(), properties);
-
- // Then - Verify new TTL takes effect
- assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(45);
- // Max size should remain default
- assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(10000);
- }
-
@Test
public void testOnChangeSkipsRebuildWhenNoRelevantConfigsChanged() {
// Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
Map<String, String> initialProperties = new HashMap<>();
initialProperties.put("pinot.server.table.reload.status.cache.size.max",
"8000");
@@ -282,7 +232,7 @@ public class ServerReloadJobStatusCacheTest {
@Test
public void testOnChangeRebuildsWhenRelevantConfigsChanged() {
// Given
- ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
Map<String, String> initialProperties = new HashMap<>();
initialProperties.put("pinot.server.table.reload.status.cache.size.max",
"8000");
@@ -309,4 +259,205 @@ public class ServerReloadJobStatusCacheTest {
assertThat(configAfter.getMaxSize()).isEqualTo(12000);
assertThat(configAfter.getTtlDays()).isEqualTo(40);
}
+
+ // ========== Tests for recordFailure() and getFailedSegmentDetails()
==========
+
+ @Test
+ public void testRecordFailureCreatesJobIfNotExists() {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ String jobId = "job-new";
+ String segmentName = "segment_123";
+ Exception exception = new IOException("Test error");
+
+ // When
+ cache.recordFailure(jobId, segmentName, exception);
+
+ // Then
+ ReloadJobStatus status = cache.getJobStatus(jobId);
+ assertThat(status).isNotNull();
+ assertThat(status.getFailureCount()).isEqualTo(1);
+ assertThat(status.getFailedSegmentDetails()).hasSize(1);
+
assertThat(status.getFailedSegmentDetails().get(0).getSegmentName()).isEqualTo(segmentName);
+ }
+
+ @org.testng.annotations.DataProvider(name = "failureLimits")
+ public Object[][] failureLimitsProvider() {
+ return new Object[][] {
+ {5, 10, "default limit (5)"},
+ {3, 5, "custom limit (3)"},
+ {1, 3, "limit of 1"}
+ };
+ }
+
+ @Test(dataProvider = "failureLimits")
+ public void testRecordFailureRespectsLimit(int limit, int totalFailures,
String description) {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ if (limit != 5) {
+ Map<String, String> properties = new HashMap<>();
+
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
String.valueOf(limit));
+ cache.onChange(properties.keySet(), properties);
+ }
+
+ String jobId = "job-limit-" + limit;
+
+ // When - Record failures
+ for (int i = 1; i <= totalFailures; i++) {
+ cache.recordFailure(jobId, "segment_" + i, new IOException("Error " +
i));
+ }
+
+ // Then - Count should equal totalFailures, but only first 'limit' details
stored
+ ReloadJobStatus status = cache.getJobStatus(jobId);
+ assertThat(status.getFailureCount()).isEqualTo(totalFailures);
+ assertThat(status.getFailedSegmentDetails()).hasSize(limit);
+
assertThat(status.getFailedSegmentDetails().get(0).getSegmentName()).isEqualTo("segment_1");
+ assertThat(status.getFailedSegmentDetails().get(limit -
1).getSegmentName()).isEqualTo("segment_" + limit);
+ }
+
+ @Test
+ public void testRecordFailureConcurrent() throws Exception {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ String jobId = "job-concurrent";
+ int threadCount = 10;
+ int failuresPerThread = 5;
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ // When - Record failures concurrently from multiple threads
+ for (int t = 0; t < threadCount; t++) {
+ int threadId = t;
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < failuresPerThread; i++) {
+ cache.recordFailure(jobId, "segment_t" + threadId + "_" + i,
+ new IOException("Error from thread " + threadId));
+ }
+ }, executor);
+ futures.add(future);
+ }
+
+ // Wait for all threads to complete
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(10,
TimeUnit.SECONDS);
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+
+ // Then - All failures should be counted
+ ReloadJobStatus status = cache.getJobStatus(jobId);
+ assertThat(status.getFailureCount()).isEqualTo(threadCount *
failuresPerThread);
+ // Only first 5 details stored (default limit)
+ assertThat(status.getFailedSegmentDetails()).hasSize(5);
+ }
+
+ @Test
+ public void testConfigChangeUpdatesMaxFailureDetailsLimit() {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+
assertThat(cache.getCurrentConfig().getSegmentFailureDetailsCount()).isEqualTo(5);
// Default
+
+ // When - Update limit to 2
+ Map<String, String> properties = new HashMap<>();
+
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
"2");
+ cache.onChange(properties.keySet(), properties);
+
+ // Then - New config should be applied
+
assertThat(cache.getCurrentConfig().getSegmentFailureDetailsCount()).isEqualTo(2);
+
+ // New jobs should use new limit
+ String jobId = "job-new-limit";
+ for (int i = 1; i <= 5; i++) {
+ cache.recordFailure(jobId, "segment_" + i, new IOException("Error " +
i));
+ }
+
+ ReloadJobStatus newJobStatus = cache.getJobStatus(jobId);
+ assertThat(newJobStatus.getFailureCount()).isEqualTo(5);
+ assertThat(newJobStatus.getFailedSegmentDetails()).hasSize(2); // New
limit applied
+ }
+
+ @Test
+ public void testGetOrCreateConcurrent() throws Exception {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ String jobId = "concurrent-job";
+ int threadCount = 20;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List<CompletableFuture<ReloadJobStatus>> futures = new ArrayList<>();
+
+ // When - Multiple threads try to create the same job
+ for (int t = 0; t < threadCount; t++) {
+ CompletableFuture<ReloadJobStatus> future =
CompletableFuture.supplyAsync(() -> {
+ return cache.getOrCreate(jobId);
+ }, executor);
+ futures.add(future);
+ }
+
+ // Wait for all threads to complete
+ List<ReloadJobStatus> results = new ArrayList<>();
+ for (CompletableFuture<ReloadJobStatus> future : futures) {
+ results.add(future.get(10, TimeUnit.SECONDS));
+ }
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+
+ // Then - All threads should get the exact same instance
+ ReloadJobStatus firstStatus = results.get(0);
+ for (ReloadJobStatus status : results) {
+ assertThat(status).isSameAs(firstStatus);
+ }
+ assertThat(cache.getJobStatus(jobId)).isSameAs(firstStatus);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void testGetFailedSegmentDetailsReturnsUnmodifiableList() {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ String jobId = "test-job";
+ cache.recordFailure(jobId, "segment_1", new IOException("Error"));
+
+ // When - Try to modify the returned list
+ ReloadJobStatus status = cache.getJobStatus(jobId);
+ List<SegmentReloadFailureResponse> details =
status.getFailedSegmentDetails();
+
+ // Then - Should throw UnsupportedOperationException
+ details.add(new SegmentReloadFailureResponse());
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testRecordFailureWithNullJobId() {
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ cache.recordFailure(null, "segment", new IOException("Error"));
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testRecordFailureWithNullSegmentName() {
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ cache.recordFailure("job-1", null, new IOException("Error"));
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testRecordFailureWithNullException() {
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ cache.recordFailure("job-1", "segment", null);
+ }
+
+ @Test
+ public void testEdgeCaseZeroLimit() {
+ // Given
+ ServerReloadJobStatusCache cache = new
ServerReloadJobStatusCache("testServer");
+ Map<String, String> properties = new HashMap<>();
+
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
"0");
+ cache.onChange(properties.keySet(), properties);
+
+ String jobId = "job-zero-limit";
+
+ // When - Record failures with zero limit
+ cache.recordFailure(jobId, "segment_1", new IOException("Error"));
+ cache.recordFailure(jobId, "segment_2", new IOException("Error"));
+
+ // Then - Count should be 2, but no details stored
+ ReloadJobStatus zeroLimitStatus = cache.getJobStatus(jobId);
+ assertThat(zeroLimitStatus.getFailureCount()).isEqualTo(2);
+ assertThat(zeroLimitStatus.getFailedSegmentDetails()).isEmpty();
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
index fea9227f81b..fc1cf1da447 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiOperation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.GET;
@@ -34,14 +35,13 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.response.server.ServerReloadStatusResponse;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.utils.ReloadJobStatus;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -85,23 +85,24 @@ public class ControllerJobStatusResource {
totalSegmentCount = targetSegments.size();
}
try {
- long successCount = 0;
+ int successCount = 0;
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
if (segmentDataManager.getLoadTimeMs() >=
reloadJobSubmissionTimestamp) {
successCount++;
}
}
- // Query cache for failure count if reloadJobId is provided
- Long failureCount = null;
- if (reloadJobId != null) {
- ReloadJobStatus jobStatus =
_serverReloadJobStatusCache.getJobStatus(reloadJobId);
- if (jobStatus != null) {
- failureCount = (long) jobStatus.getFailureCount();
- }
- }
+ ServerReloadStatusResponse response = new ServerReloadStatusResponse()
+ .setTotalSegmentCount(totalSegmentCount)
+ .setSuccessCount(successCount);
+
+ Optional.ofNullable(reloadJobId)
+ .map(_serverReloadJobStatusCache::getJobStatus)
+ .ifPresent(jobStatus -> response
+ .setFailureCount((long) jobStatus.getFailureCount())
+
.setSampleSegmentReloadFailures(jobStatus.getFailedSegmentDetails()));
- return JsonUtils.objectToString(new
SegmentReloadStatusValue(totalSegmentCount, successCount, failureCount));
+ return JsonUtils.objectToString(response);
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
tableDataManager.releaseSegment(segmentDataManager);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index f658b1bb56c..02a409bbd5f 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -627,7 +627,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
ServerMetrics.register(_serverMetrics);
LOGGER.info("Initializing reload job status cache");
- _reloadJobStatusCache = new ServerReloadJobStatusCache();
+ _reloadJobStatusCache = new ServerReloadJobStatusCache(_instanceId);
// Register cache as cluster config listener for dynamic config updates
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_reloadJobStatusCache);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]