This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b918196a222 Add segment-level failure details capture to reload status 
tracking (#17234)
b918196a222 is described below

commit b918196a222a4383d8b59d7e81e392f7c8ff6236
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Sat Nov 22 10:45:32 2025 -0800

    Add segment-level failure details capture to reload status tracking (#17234)
---
 .gitignore                                         |   4 +-
 .../common/response/server/ApiErrorResponse.java   |  35 ++-
 .../server/SegmentReloadFailureResponse.java       |  69 +++++
 .../server/ServerReloadStatusResponse.java         |  52 +++-
 .../api/dto/PinotTableReloadStatusResponse.java    |  17 ++
 .../services/PinotTableReloadStatusReporter.java   |  35 ++-
 .../core/data/manager/BaseTableDataManager.java    |   2 +-
 .../manager/provider/TableDataManagerProvider.java |   2 +-
 .../perf/BenchmarkDimensionTableOverhead.java      |   2 +-
 .../pinot/segment/local/utils/ReloadJobStatus.java |  19 +-
 .../local/utils/ServerReloadJobStatusCache.java    |  44 +++-
 .../utils/ServerReloadJobStatusCacheConfig.java    |  15 +-
 .../utils/ServerReloadJobStatusCacheTest.java      | 285 ++++++++++++++++-----
 .../api/resources/ControllerJobStatusResource.java |  25 +-
 .../server/starter/helix/BaseServerStarter.java    |   2 +-
 15 files changed, 480 insertions(+), 128 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5fb3db21a06..1b3de1a0f5a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,6 @@ kubernetes/helm/**/Chart.lock
 .mvn/.develocity/
 
 pinot-integration-tests/src/test/resources/udf-test-results/*.md
-!/pinot-integration-tests/src/test/resources/udf-test-results/README.md
\ No newline at end of file
+!/pinot-integration-tests/src/test/resources/udf-test-results/README.md
+
+tmp/
\ No newline at end of file
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
similarity index 53%
copy from 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
copy to 
pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
index 833f96d5507..37456f9dfe8 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ApiErrorResponse.java
@@ -16,33 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pinot.common.response.server;
 
-package org.apache.pinot.server.starter.helix;
+import org.apache.pinot.spi.annotations.InterfaceStability;
 
-public class SegmentReloadStatusValue {
-  private final long _totalSegmentCount;
-  private final long _successCount;
-  private final Long _failureCount;
 
-  public SegmentReloadStatusValue(long totalSegmentCount, long successCount) {
-    this(totalSegmentCount, successCount, null);
-  }
[email protected]
+public class ApiErrorResponse {
+
+  private String _errorMsg;
+  private String _stacktrace;
 
-  public SegmentReloadStatusValue(long totalSegmentCount, long successCount, 
Long failureCount) {
-    _totalSegmentCount = totalSegmentCount;
-    _successCount = successCount;
-    _failureCount = failureCount;
+  public String getErrorMsg() {
+    return _errorMsg;
   }
 
-  public long getTotalSegmentCount() {
-    return _totalSegmentCount;
+  public ApiErrorResponse setErrorMsg(String errorMsg) {
+    _errorMsg = errorMsg;
+    return this;
   }
 
-  public long getSuccessCount() {
-    return _successCount;
+  public String getStacktrace() {
+    return _stacktrace;
   }
 
-  public Long getFailureCount() {
-    return _failureCount;
+  public ApiErrorResponse setStacktrace(String stacktrace) {
+    _stacktrace = stacktrace;
+    return this;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
new file mode 100644
index 00000000000..442d3da65f0
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/SegmentReloadFailureResponse.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.server;
+
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+
+/**
+ * DTO representing a single segment reload failure.
+ */
[email protected]
+public class SegmentReloadFailureResponse {
+  private String _segmentName;
+  private String _serverName;
+  private ApiErrorResponse _error;
+  private long _failedAtMs;
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public SegmentReloadFailureResponse setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+    return this;
+  }
+
+  public String getServerName() {
+    return _serverName;
+  }
+
+  public SegmentReloadFailureResponse setServerName(String serverName) {
+    _serverName = serverName;
+    return this;
+  }
+
+  public ApiErrorResponse getError() {
+    return _error;
+  }
+
+  public SegmentReloadFailureResponse setError(ApiErrorResponse error) {
+    _error = error;
+    return this;
+  }
+
+  public long getFailedAtMs() {
+    return _failedAtMs;
+  }
+
+  public SegmentReloadFailureResponse setFailedAtMs(long failedAtMs) {
+    _failedAtMs = failedAtMs;
+    return this;
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
similarity index 50%
rename from 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
index 833f96d5507..70516dd4589 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadStatusValue.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/server/ServerReloadStatusResponse.java
@@ -17,32 +17,56 @@
  * under the License.
  */
 
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.common.response.server;
 
-public class SegmentReloadStatusValue {
-  private final long _totalSegmentCount;
-  private final long _successCount;
-  private final Long _failureCount;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.InterfaceStability;
 
-  public SegmentReloadStatusValue(long totalSegmentCount, long successCount) {
-    this(totalSegmentCount, successCount, null);
-  }
 
-  public SegmentReloadStatusValue(long totalSegmentCount, long successCount, 
Long failureCount) {
-    _totalSegmentCount = totalSegmentCount;
-    _successCount = successCount;
-    _failureCount = failureCount;
-  }
[email protected]
+public class ServerReloadStatusResponse {
+  private long _totalSegmentCount;
+  private int _successCount;
+  private Long _failureCount;
+  private List<SegmentReloadFailureResponse> _segmentReloadFailures;
 
   public long getTotalSegmentCount() {
     return _totalSegmentCount;
   }
 
-  public long getSuccessCount() {
+  public ServerReloadStatusResponse setTotalSegmentCount(long 
totalSegmentCount) {
+    _totalSegmentCount = totalSegmentCount;
+    return this;
+  }
+
+  public int getSuccessCount() {
     return _successCount;
   }
 
+  public ServerReloadStatusResponse setSuccessCount(int successCount) {
+    _successCount = successCount;
+    return this;
+  }
+
+  @Nullable
   public Long getFailureCount() {
     return _failureCount;
   }
+
+  public ServerReloadStatusResponse setFailureCount(Long failureCount) {
+    _failureCount = failureCount;
+    return this;
+  }
+
+  @Nullable
+  public List<SegmentReloadFailureResponse> getSampleSegmentReloadFailures() {
+    return _segmentReloadFailures;
+  }
+
+  public ServerReloadStatusResponse setSampleSegmentReloadFailures(
+      List<SegmentReloadFailureResponse> sampleSegmentReloadFailureResponses) {
+    _segmentReloadFailures = sampleSegmentReloadFailureResponses;
+    return this;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
index 828a75aed63..f56f2b2f781 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pinot.controller.api.dto;
 
+import java.util.List;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+
[email protected]
 public class PinotTableReloadStatusResponse {
   private double _timeElapsedInMinutes;
   private double _estimatedTimeRemainingInMinutes;
@@ -27,6 +33,7 @@ public class PinotTableReloadStatusResponse {
   private int _totalServerCallsFailed;
   private Long _failureCount;
   private PinotControllerJobMetadataDto _metadata;
+  private List<SegmentReloadFailureResponse> _segmentReloadFailures;
 
   public int getTotalSegmentCount() {
     return _totalSegmentCount;
@@ -101,4 +108,14 @@ public class PinotTableReloadStatusResponse {
     _metadata = metadata;
     return this;
   }
+
+  public List<SegmentReloadFailureResponse> getSegmentReloadFailures() {
+    return _segmentReloadFailures;
+  }
+
+  public PinotTableReloadStatusResponse setSegmentReloadFailures(
+      List<SegmentReloadFailureResponse> segmentReloadFailures) {
+    _segmentReloadFailures = segmentReloadFailures;
+    return this;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
index a1873b1d20b..6e76c96b792 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -35,6 +35,8 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
+import org.apache.pinot.common.response.server.ServerReloadStatusResponse;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.api.dto.PinotControllerJobMetadataDto;
 import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
@@ -155,19 +157,29 @@ public class PinotTableReloadStatusReporter {
 
     long totalFailureCount = 0;
     boolean hasFailureCountData = false;
+    List<SegmentReloadFailureResponse> allFailedSegments = new ArrayList<>();
 
+    // Single iteration to aggregate counts and collect failed segments
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       String responseString = streamResponse.getValue();
       try {
-        PinotTableReloadStatusResponse r =
-            JsonUtils.stringToObject(responseString, 
PinotTableReloadStatusResponse.class);
-        response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
+        ServerReloadStatusResponse serverResponse =
+            JsonUtils.stringToObject(responseString, 
ServerReloadStatusResponse.class);
+
+        // Aggregate success count
+        response.setSuccessCount(response.getSuccessCount() + 
serverResponse.getSuccessCount());
 
         // Aggregate failure counts if available
-        if (r.getFailureCount() != null) {
-          totalFailureCount += r.getFailureCount();
+        if (serverResponse.getFailureCount() != null) {
+          totalFailureCount += serverResponse.getFailureCount();
           hasFailureCountData = true;
         }
+
+        // Collect failed segments
+        if (serverResponse.getSampleSegmentReloadFailures() != null
+            && !serverResponse.getSampleSegmentReloadFailures().isEmpty()) {
+          
allFailedSegments.addAll(serverResponse.getSampleSegmentReloadFailures());
+        }
       } catch (Exception e) {
         
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
       }
@@ -178,6 +190,19 @@ public class PinotTableReloadStatusReporter {
       response.setFailureCount(totalFailureCount);
     }
 
+    // Set failed segments in response if any were collected
+    if (!allFailedSegments.isEmpty()) {
+      // Limit to prevent huge responses (e.g., max 500 failures across all 
servers)
+      // This limit is higher than per-server limit since we're aggregating
+      int maxFailuresInResponse = 500;
+      if (allFailedSegments.size() > maxFailuresInResponse) {
+        LOG.warn("Truncating failed segments list from {} to {} for job {}",
+            allFailedSegments.size(), maxFailuresInResponse, reloadJobId);
+        allFailedSegments = allFailedSegments.subList(0, 
maxFailuresInResponse);
+      }
+      response.setSegmentReloadFailures(allFailedSegments);
+    }
+
     // Add derived fields
     final double timeElapsedInMinutes = 
computeTimeElapsedInMinutes(reloadJobMetadata.getSubmissionTimeMs());
     final double estimatedRemainingTimeInMinutes =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 17d2db31bd6..9c96cfdef62 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -801,7 +801,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         failedSegments.add(segmentName);
         sampleException.set(t);
         if (reloadJobId != null) {
-          
_reloadJobStatusCache.getOrCreate(reloadJobId).incrementAndGetFailureCount();
+          _reloadJobStatusCache.recordFailure(reloadJobId, segmentName, t);
         }
       }
     }, 
_segmentReloadRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index aaddb8a1a42..7a8f86121ed 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -63,6 +63,6 @@ public interface TableDataManagerProvider {
   @VisibleForTesting
   default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
     return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
-        null, null, () -> true, false, new ServerReloadJobStatusCache());
+        null, null, () -> true, false, new 
ServerReloadJobStatusCache("testInstance"));
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 5dc8a24439f..9619023ba3b 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -198,7 +198,7 @@ public class BenchmarkDimensionTableOverhead extends 
BaseQueriesTest {
         null,
         SEGMENT_OPERATIONS_THROTTLER,
         false,
-        new ServerReloadJobStatusCache());
+        new ServerReloadJobStatusCache("benchmarkInstance"));
     _tableDataManager.start();
 
     for (int i = 0; i < _indexSegments.size(); i++) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
index fc39147d288..5f8d3c33ea0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ReloadJobStatus.java
@@ -18,21 +18,25 @@
  */
 package org.apache.pinot.segment.local.utils;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
 
 
 /**
  * Tracks status of a reload job.
- * Phase 1: Only tracks failure count.
+ * Thread-safe for concurrent access.
  */
 public class ReloadJobStatus {
   private final String _jobId;
-  private final AtomicInteger _failureCount;
+  private final AtomicInteger _failureCount = new AtomicInteger(0);
   private final long _createdTimeMs;
+  private final List<SegmentReloadFailureResponse> _failedSegmentDetails = new 
ArrayList<>();
 
   public ReloadJobStatus(String jobId) {
     _jobId = jobId;
-    _failureCount = new AtomicInteger(0);
     _createdTimeMs = System.currentTimeMillis();
   }
 
@@ -52,9 +56,18 @@ public class ReloadJobStatus {
     return _createdTimeMs;
   }
 
+  public synchronized List<SegmentReloadFailureResponse> 
getFailedSegmentDetails() {
+    return Collections.unmodifiableList(_failedSegmentDetails);
+  }
+
+  synchronized void addFailureDetail(SegmentReloadFailureResponse 
failureResponse) {
+    _failedSegmentDetails.add(failureResponse);
+  }
+
   @Override
   public String toString() {
     return "ReloadJobStatus{jobId='" + _jobId + "', failureCount=" + 
_failureCount.get()
+        + ", failedSegmentDetailsCount=" + _failedSegmentDetails.size()
         + ", createdTimeMs=" + _createdTimeMs + '}';
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
index 6c576dca1d3..840aed04934 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration2.MapConfiguration;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.response.server.ApiErrorResponse;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
 import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
@@ -38,7 +41,6 @@ import static java.util.Objects.requireNonNull;
 
 /**
  * In-memory cache for tracking reload job status on server side.
- * Phase 1: Only tracks failure count per job.
  *
  * <p>Thread-safe for concurrent access. Uses Guava Cache with LRU eviction
  * and time-based expiration.
@@ -53,10 +55,12 @@ public class ServerReloadJobStatusCache implements 
PinotClusterConfigChangeListe
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   static final String CONFIG_PREFIX = "pinot.server.table.reload.status.cache";
 
+  private final String _instanceId;
   private volatile Cache<String, ReloadJobStatus> _cache;
   private volatile ServerReloadJobStatusCacheConfig _currentConfig;
 
-  public ServerReloadJobStatusCache() {
+  public ServerReloadJobStatusCache(String instanceId) {
+    _instanceId = requireNonNull(instanceId, "instanceId cannot be null");
     _currentConfig = new ServerReloadJobStatusCacheConfig();
     _cache = CacheBuilder.newBuilder()
         .maximumSize(_currentConfig.getMaxSize())
@@ -64,7 +68,7 @@ public class ServerReloadJobStatusCache implements 
PinotClusterConfigChangeListe
         .recordStats()
         .build();
 
-    LOG.info("Initialized ReloadJobStatusCache with {}", _currentConfig);
+    LOG.info("Initialized ReloadJobStatusCache for instance {} with {}", 
_instanceId, _currentConfig);
   }
 
   /**
@@ -103,6 +107,40 @@ public class ServerReloadJobStatusCache implements 
PinotClusterConfigChangeListe
     return status;
   }
 
+  /**
+   * Records a segment reload failure in the cache.
+   * Handles all business logic: counting, limit enforcement, thread safety.
+   *
+   * <p>This method ALWAYS increments the failure count, but only stores 
detailed
+   * failure information (segment name, exception, stack trace) for the first 
N failures
+   * where N is configured by maxFailureDetailsToCapture.
+   *
+   * @param jobId reload job ID (UUID)
+   * @param segmentName name of failed segment
+   * @param exception the exception that caused the failure
+   */
+  public void recordFailure(String jobId, String segmentName, Throwable 
exception) {
+    requireNonNull(jobId, "jobId cannot be null");
+    requireNonNull(segmentName, "segmentName cannot be null");
+    requireNonNull(exception, "exception cannot be null");
+
+    ReloadJobStatus status = getOrCreate(jobId);
+    status.incrementAndGetFailureCount();
+
+    synchronized (status) {
+      int maxLimit = _currentConfig.getSegmentFailureDetailsCount();
+      if (status.getFailedSegmentDetails().size() < maxLimit) {
+        status.addFailureDetail(new SegmentReloadFailureResponse()
+            .setSegmentName(segmentName)
+            .setServerName(_instanceId)
+            .setError(new ApiErrorResponse()
+                .setErrorMsg(exception.getMessage())
+                .setStacktrace(ExceptionUtils.getStackTrace(exception)))
+            .setFailedAtMs(System.currentTimeMillis()));
+      }
+    }
+  }
+
   /**
    * Rebuilds the cache with new configuration and migrates existing entries.
    * This method is synchronized to prevent concurrent rebuilds.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
index 304e2221b17..8d113fbdf28 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheConfig.java
@@ -34,6 +34,9 @@ public class ServerReloadJobStatusCacheConfig {
   @JsonProperty("ttl.days")
   private int _ttlDays = 30;
 
+  @JsonProperty("segment.failure.details.count")
+  private int _segmentFailureDetailsCount = 5;
+
   public int getMaxSize() {
     return _maxSize;
   }
@@ -52,8 +55,18 @@ public class ServerReloadJobStatusCacheConfig {
     return this;
   }
 
+  public int getSegmentFailureDetailsCount() {
+    return _segmentFailureDetailsCount;
+  }
+
+  public ServerReloadJobStatusCacheConfig setSegmentFailureDetailsCount(int 
segmentFailureDetailsCount) {
+    _segmentFailureDetailsCount = segmentFailureDetailsCount;
+    return this;
+  }
+
   @Override
   public String toString() {
-    return "ServerReloadJobStatusCacheConfig{maxSize=" + _maxSize + ", 
ttlDays=" + _ttlDays + '}';
+    return "ServerReloadJobStatusCacheConfig{maxSize=" + _maxSize + ", 
ttlDays=" + _ttlDays
+        + ", segmentFailureDetailsCount=" + _segmentFailureDetailsCount + '}';
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
index 1d18ece7489..b459cbfb44a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java
@@ -18,9 +18,17 @@
  */
 package org.apache.pinot.segment.local.utils;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.response.server.SegmentReloadFailureResponse;
 import org.testng.annotations.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -35,7 +43,7 @@ public class ServerReloadJobStatusCacheTest {
   @Test
   public void testDefaultConfigInitialization() {
     // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // Then
     ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
@@ -52,7 +60,7 @@ public class ServerReloadJobStatusCacheTest {
     properties.put("pinot.server.table.reload.status.cache.ttl.days", "15");
     properties.put("some.other.config", "value");
 
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // When
     cache.onChange(properties.keySet(), properties);
@@ -70,7 +78,7 @@ public class ServerReloadJobStatusCacheTest {
     properties.put("pinot.server.table.reload.status.cache.size.max", "7500");
     properties.put("some.other.config", "value");
 
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // When
     cache.onChange(properties.keySet(), properties);
@@ -89,7 +97,7 @@ public class ServerReloadJobStatusCacheTest {
     properties.put("some.other.config", "value");
     properties.put("another.config", "123");
 
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // When
     cache.onChange(properties.keySet(), properties);
@@ -106,7 +114,7 @@ public class ServerReloadJobStatusCacheTest {
     Map<String, String> properties = new HashMap<>();
     properties.put("pinot.server.table.reload.status.cache.size.max", 
"invalid");
 
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
     ServerReloadJobStatusCacheConfig oldConfig = cache.getCurrentConfig();
 
     // When - Invalid config should keep old cache
@@ -118,34 +126,10 @@ public class ServerReloadJobStatusCacheTest {
     assertThat(config.getMaxSize()).isEqualTo(10000);
   }
 
-  @Test
-  public void testConfigUpdateOverwritesPrevious() {
-    // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
-
-    // Set initial config
-    Map<String, String> initialProperties = new HashMap<>();
-    initialProperties.put("pinot.server.table.reload.status.cache.size.max", 
"8000");
-    initialProperties.put("pinot.server.table.reload.status.cache.ttl.days", 
"20");
-    cache.onChange(initialProperties.keySet(), initialProperties);
-    assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(8000);
-    assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(20);
-
-    // When - Update with new config
-    Map<String, String> updatedProperties = new HashMap<>();
-    updatedProperties.put("pinot.server.table.reload.status.cache.size.max", 
"12000");
-    updatedProperties.put("pinot.server.table.reload.status.cache.ttl.days", 
"45");
-    cache.onChange(updatedProperties.keySet(), updatedProperties);
-
-    // Then
-    assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(12000);
-    assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(45);
-  }
-
   @Test
   public void testZookeeperConfigDeletionRevertsToDefaults() {
     // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // Set initial custom configs
     Map<String, String> customProperties = new HashMap<>();
@@ -186,7 +170,7 @@ public class ServerReloadJobStatusCacheTest {
   @Test
   public void testCacheEntryMigrationOnRebuild() {
     // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     // Add some entries to cache
     ReloadJobStatus status1 = cache.getOrCreate("job-1");
@@ -216,44 +200,10 @@ public class ServerReloadJobStatusCacheTest {
     assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(15);
   }
 
-  @Test
-  public void testCacheRebuildWithDifferentSize() {
-    // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
-    assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(10000);
-
-    // When - Update only max size
-    Map<String, String> properties = new HashMap<>();
-    properties.put("pinot.server.table.reload.status.cache.size.max", "20000");
-    cache.onChange(properties.keySet(), properties);
-
-    // Then - Verify new size takes effect
-    assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(20000);
-    // TTL should remain default
-    assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(30);
-  }
-
-  @Test
-  public void testCacheRebuildWithDifferentTTL() {
-    // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
-    assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(30);
-
-    // When - Update only TTL
-    Map<String, String> properties = new HashMap<>();
-    properties.put("pinot.server.table.reload.status.cache.ttl.days", "45");
-    cache.onChange(properties.keySet(), properties);
-
-    // Then - Verify new TTL takes effect
-    assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(45);
-    // Max size should remain default
-    assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(10000);
-  }
-
   @Test
   public void testOnChangeSkipsRebuildWhenNoRelevantConfigsChanged() {
     // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     Map<String, String> initialProperties = new HashMap<>();
     initialProperties.put("pinot.server.table.reload.status.cache.size.max", 
"8000");
@@ -282,7 +232,7 @@ public class ServerReloadJobStatusCacheTest {
   @Test
   public void testOnChangeRebuildsWhenRelevantConfigsChanged() {
     // Given
-    ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
 
     Map<String, String> initialProperties = new HashMap<>();
     initialProperties.put("pinot.server.table.reload.status.cache.size.max", 
"8000");
@@ -309,4 +259,205 @@ public class ServerReloadJobStatusCacheTest {
     assertThat(configAfter.getMaxSize()).isEqualTo(12000);
     assertThat(configAfter.getTtlDays()).isEqualTo(40);
   }
+
+  // ========== Tests for recordFailure() and getFailedSegmentDetails() 
==========
+
+  @Test
+  public void testRecordFailureCreatesJobIfNotExists() {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    String jobId = "job-new";
+    String segmentName = "segment_123";
+    Exception exception = new IOException("Test error");
+
+    // When
+    cache.recordFailure(jobId, segmentName, exception);
+
+    // Then
+    ReloadJobStatus status = cache.getJobStatus(jobId);
+    assertThat(status).isNotNull();
+    assertThat(status.getFailureCount()).isEqualTo(1);
+    assertThat(status.getFailedSegmentDetails()).hasSize(1);
+    
assertThat(status.getFailedSegmentDetails().get(0).getSegmentName()).isEqualTo(segmentName);
+  }
+
+  @org.testng.annotations.DataProvider(name = "failureLimits")
+  public Object[][] failureLimitsProvider() {
+    return new Object[][] {
+        {5, 10, "default limit (5)"},
+        {3, 5, "custom limit (3)"},
+        {1, 3, "limit of 1"}
+    };
+  }
+
+  @Test(dataProvider = "failureLimits")
+  public void testRecordFailureRespectsLimit(int limit, int totalFailures, 
String description) {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    if (limit != 5) {
+      Map<String, String> properties = new HashMap<>();
+      
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
 String.valueOf(limit));
+      cache.onChange(properties.keySet(), properties);
+    }
+
+    String jobId = "job-limit-" + limit;
+
+    // When - Record failures
+    for (int i = 1; i <= totalFailures; i++) {
+      cache.recordFailure(jobId, "segment_" + i, new IOException("Error " + 
i));
+    }
+
+    // Then - Count should equal totalFailures, but only first 'limit' details 
stored
+    ReloadJobStatus status = cache.getJobStatus(jobId);
+    assertThat(status.getFailureCount()).isEqualTo(totalFailures);
+    assertThat(status.getFailedSegmentDetails()).hasSize(limit);
+    
assertThat(status.getFailedSegmentDetails().get(0).getSegmentName()).isEqualTo("segment_1");
+    assertThat(status.getFailedSegmentDetails().get(limit - 
1).getSegmentName()).isEqualTo("segment_" + limit);
+  }
+
+  @Test
+  public void testRecordFailureConcurrent() throws Exception {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    String jobId = "job-concurrent";
+    int threadCount = 10;
+    int failuresPerThread = 5;
+
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+    // When - Record failures concurrently from multiple threads
+    for (int t = 0; t < threadCount; t++) {
+      int threadId = t;
+      CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+        for (int i = 0; i < failuresPerThread; i++) {
+          cache.recordFailure(jobId, "segment_t" + threadId + "_" + i,
+              new IOException("Error from thread " + threadId));
+        }
+      }, executor);
+      futures.add(future);
+    }
+
+    // Wait for all threads to complete
+    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(10, 
TimeUnit.SECONDS);
+    executor.shutdown();
+    executor.awaitTermination(5, TimeUnit.SECONDS);
+
+    // Then - All failures should be counted
+    ReloadJobStatus status = cache.getJobStatus(jobId);
+    assertThat(status.getFailureCount()).isEqualTo(threadCount * 
failuresPerThread);
+    // Only first 5 details stored (default limit)
+    assertThat(status.getFailedSegmentDetails()).hasSize(5);
+  }
+
+  @Test
+  public void testConfigChangeUpdatesMaxFailureDetailsLimit() {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    
assertThat(cache.getCurrentConfig().getSegmentFailureDetailsCount()).isEqualTo(5);
  // Default
+
+    // When - Update limit to 2
+    Map<String, String> properties = new HashMap<>();
+    
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
 "2");
+    cache.onChange(properties.keySet(), properties);
+
+    // Then - New config should be applied
+    
assertThat(cache.getCurrentConfig().getSegmentFailureDetailsCount()).isEqualTo(2);
+
+    // New jobs should use new limit
+    String jobId = "job-new-limit";
+    for (int i = 1; i <= 5; i++) {
+      cache.recordFailure(jobId, "segment_" + i, new IOException("Error " + 
i));
+    }
+
+    ReloadJobStatus newJobStatus = cache.getJobStatus(jobId);
+    assertThat(newJobStatus.getFailureCount()).isEqualTo(5);
+    assertThat(newJobStatus.getFailedSegmentDetails()).hasSize(2);  // New 
limit applied
+  }
+
+  @Test
+  public void testGetOrCreateConcurrent() throws Exception {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    String jobId = "concurrent-job";
+    int threadCount = 20;
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<CompletableFuture<ReloadJobStatus>> futures = new ArrayList<>();
+
+    // When - Multiple threads try to create the same job
+    for (int t = 0; t < threadCount; t++) {
+      CompletableFuture<ReloadJobStatus> future = 
CompletableFuture.supplyAsync(() -> {
+        return cache.getOrCreate(jobId);
+      }, executor);
+      futures.add(future);
+    }
+
+    // Wait for all threads to complete
+    List<ReloadJobStatus> results = new ArrayList<>();
+    for (CompletableFuture<ReloadJobStatus> future : futures) {
+      results.add(future.get(10, TimeUnit.SECONDS));
+    }
+    executor.shutdown();
+    executor.awaitTermination(5, TimeUnit.SECONDS);
+
+    // Then - All threads should get the exact same instance
+    ReloadJobStatus firstStatus = results.get(0);
+    for (ReloadJobStatus status : results) {
+      assertThat(status).isSameAs(firstStatus);
+    }
+    assertThat(cache.getJobStatus(jobId)).isSameAs(firstStatus);
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void testGetFailedSegmentDetailsReturnsUnmodifiableList() {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    String jobId = "test-job";
+    cache.recordFailure(jobId, "segment_1", new IOException("Error"));
+
+    // When - Try to modify the returned list
+    ReloadJobStatus status = cache.getJobStatus(jobId);
+    List<SegmentReloadFailureResponse> details = 
status.getFailedSegmentDetails();
+
+    // Then - Should throw UnsupportedOperationException
+    details.add(new SegmentReloadFailureResponse());
+  }
+
+  @Test(expectedExceptions = NullPointerException.class)
+  public void testRecordFailureWithNullJobId() {
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    cache.recordFailure(null, "segment", new IOException("Error"));
+  }
+
+  @Test(expectedExceptions = NullPointerException.class)
+  public void testRecordFailureWithNullSegmentName() {
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    cache.recordFailure("job-1", null, new IOException("Error"));
+  }
+
+  @Test(expectedExceptions = NullPointerException.class)
+  public void testRecordFailureWithNullException() {
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    cache.recordFailure("job-1", "segment", null);
+  }
+
+  @Test
+  public void testEdgeCaseZeroLimit() {
+    // Given
+    ServerReloadJobStatusCache cache = new 
ServerReloadJobStatusCache("testServer");
+    Map<String, String> properties = new HashMap<>();
+    
properties.put("pinot.server.table.reload.status.cache.segment.failure.details.count",
 "0");
+    cache.onChange(properties.keySet(), properties);
+
+    String jobId = "job-zero-limit";
+
+    // When - Record failures with zero limit
+    cache.recordFailure(jobId, "segment_1", new IOException("Error"));
+    cache.recordFailure(jobId, "segment_2", new IOException("Error"));
+
+    // Then - Count should be 2, but no details stored
+    ReloadJobStatus zeroLimitStatus = cache.getJobStatus(jobId);
+    assertThat(zeroLimitStatus.getFailureCount()).isEqualTo(2);
+    assertThat(zeroLimitStatus.getFailedSegmentDetails()).isEmpty();
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
index fea9227f81b..fc1cf1da447 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiOperation;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.ws.rs.GET;
@@ -34,14 +35,13 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.response.server.ServerReloadStatusResponse;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.utils.ReloadJobStatus;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -85,23 +85,24 @@ public class ControllerJobStatusResource {
       totalSegmentCount = targetSegments.size();
     }
     try {
-      long successCount = 0;
+      int successCount = 0;
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         if (segmentDataManager.getLoadTimeMs() >= 
reloadJobSubmissionTimestamp) {
           successCount++;
         }
       }
 
-      // Query cache for failure count if reloadJobId is provided
-      Long failureCount = null;
-      if (reloadJobId != null) {
-        ReloadJobStatus jobStatus = 
_serverReloadJobStatusCache.getJobStatus(reloadJobId);
-        if (jobStatus != null) {
-          failureCount = (long) jobStatus.getFailureCount();
-        }
-      }
+      ServerReloadStatusResponse response = new ServerReloadStatusResponse()
+          .setTotalSegmentCount(totalSegmentCount)
+          .setSuccessCount(successCount);
+
+      Optional.ofNullable(reloadJobId)
+          .map(_serverReloadJobStatusCache::getJobStatus)
+          .ifPresent(jobStatus -> response
+              .setFailureCount((long) jobStatus.getFailureCount())
+              
.setSampleSegmentReloadFailures(jobStatus.getFailedSegmentDetails()));
 
-      return JsonUtils.objectToString(new 
SegmentReloadStatusValue(totalSegmentCount, successCount, failureCount));
+      return JsonUtils.objectToString(response);
     } finally {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         tableDataManager.releaseSegment(segmentDataManager);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index f658b1bb56c..02a409bbd5f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -627,7 +627,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     ServerMetrics.register(_serverMetrics);
 
     LOGGER.info("Initializing reload job status cache");
-    _reloadJobStatusCache = new ServerReloadJobStatusCache();
+    _reloadJobStatusCache = new ServerReloadJobStatusCache(_instanceId);
 
     // Register cache as cluster config listener for dynamic config updates
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_reloadJobStatusCache);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to