This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f5466035ac Fix ResponseStoreCleaner bugs causing unbounded response 
store growth and cleanup failures (#17622)
3f5466035ac is described below

commit 3f5466035ac21683f4cbdb2c3bb710f815bb8476
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 5 11:02:55 2026 -0800

    Fix ResponseStoreCleaner bugs causing unbounded response store growth and 
cleanup failures (#17622)
---
 .../controller/cursors/ResponseStoreCleaner.java   | 164 +++++++---
 .../cursors/ResponseStoreCleanerTest.java          | 333 +++++++++++++++++++++
 2 files changed, 452 insertions(+), 45 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
index e2a5fc26465..b7bd65667d5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
@@ -29,13 +29,10 @@ import java.util.Properties;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hc.client5.http.classic.methods.HttpDelete;
 import org.apache.hc.client5.http.classic.methods.HttpGet;
-import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
 import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.helix.model.InstanceConfig;
@@ -66,7 +63,9 @@ import org.slf4j.LoggerFactory;
  */
 public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
-  private static final int TIMEOUT_MS = 3000;
+  // Increased timeout to handle large response stores
+  private static final int GET_TIMEOUT_MS = 60_000;
+  private static final int DELETE_TIMEOUT_MS = 10_000;
   private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
   private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
   // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
@@ -132,32 +131,104 @@ public class ResponseStoreCleaner extends 
ControllerPeriodicTask<Void> {
               Integer.parseInt(HelixHelper.getGrpcPort(broker))));
     }
 
+    Map<String, String> requestHeaders;
     try {
-      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+      requestHeaders = AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create auth headers for response store cleanup", 
e);
+      return;
+    }
 
-      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+    Map<String, List<CursorResponseNative>> brokerCursorsMap;
+    try {
+      brokerCursorsMap = getAllQueryResults(brokers, requestHeaders);
+    } catch (Exception e) {
+      LOGGER.error("Failed to get query results from brokers for cleanup", e);
+      return;
+    }
+
+    String protocol = _controllerConf.getControllerBrokerProtocol();
+    int portOverride = _controllerConf.getControllerBrokerPortOverride();
 
-      String protocol = _controllerConf.getControllerBrokerProtocol();
-      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+    // Process each broker independently to ensure partial failures don't 
block cleanup of other brokers
+    for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+      String brokerKey = entry.getKey();
+      InstanceInfo broker = brokers.get(brokerKey);
 
+      // Collect URLs for expired responses for THIS broker only
       List<String> brokerUrls = new ArrayList<>();
-      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
-        for (CursorResponse response : entry.getValue()) {
-          if (response.getExpirationTimeMs() <= currentTime) {
-            InstanceInfo broker = brokers.get(entry.getKey());
-            int port = portOverride > 0 ? portOverride : broker.getPort();
-            brokerUrls.add(
-                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
-          }
+      for (CursorResponse response : entry.getValue()) {
+        if (response.getExpirationTimeMs() <= currentTime) {
+          int port = portOverride > 0 ? portOverride : broker.getPort();
+          brokerUrls.add(
+              String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
         }
-        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);
+      }
+
+      if (brokerUrls.isEmpty()) {
+        LOGGER.debug("No expired responses to clean up for broker: {}", 
brokerKey);
+        continue;
+      }
 
+      LOGGER.info("Cleaning up {} expired responses from broker: {}", 
brokerUrls.size(), brokerKey);
+
+      try {
+        Map<String, String> deleteStatus =
+            deleteExpiredResponses(requestHeaders, brokerUrls);
         deleteStatus.forEach(
             (key, value) -> LOGGER.info("ResponseStore delete response - 
Broker: {}. Response: {}", key, value));
+      } catch (Exception e) {
+        // Log error but continue with other brokers - don't let one broker 
failure block cleanup of others
+        LOGGER.error("Failed to delete expired responses from broker: {}. Will 
retry on next cleanup cycle.",
+            brokerKey, e);
+      }
+    }
+  }
+
+  /**
+   * Delete expired responses from brokers. Treats 404 responses as success 
since the goal
+   * is to ensure the response doesn't exist (idempotent delete).
+   */
+  private Map<String, String> deleteExpiredResponses(Map<String, String> 
requestHeaders, List<String> brokerUrls)
+      throws Exception {
+    List<Pair<String, String>> urlsAndRequestBodies = new 
ArrayList<>(brokerUrls.size());
+    brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(_executor, 
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+            DELETE_TIMEOUT_MS, "DELETE", HttpDelete::new);
+
+    Map<String, String> responseMap = new HashMap<>();
+    List<String> errMessages = new ArrayList<>();
+
+    for (int i = 0; i < brokerUrls.size(); i++) {
+      try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
+        URI uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+
+        if (status == 200) {
+          responseMap.put(getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort())), responseString);
+        } else if (status == 404) {
+          // 404 means the response is already deleted - this is acceptable 
for idempotent cleanup
+          LOGGER.debug("Response already deleted (404) for uri: {}", uri);
+          responseMap.put(getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort())),
+              "Already deleted (was 404)");
+        } else {
+          // Other errors are unexpected and should be logged
+          LOGGER.warn("Unexpected status={} from uri='{}', response='{}'", 
status, uri, responseString);
+          errMessages.add(String.format("Unexpected status=%d from uri='%s'", 
status, uri));
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to execute DELETE op", e);
+        errMessages.add(e.getMessage());
       }
-    } catch (Exception e) {
-      LOGGER.error(e.getMessage());
     }
+
+    if (!errMessages.isEmpty()) {
+      throw new RuntimeException("Some delete operations failed: " + 
StringUtils.join(errMessages, ", "));
+    }
+    return responseMap;
   }
 
   private Map<String, List<CursorResponseNative>> 
getAllQueryResults(Map<String, InstanceInfo> brokers,
@@ -170,51 +241,54 @@ public class ResponseStoreCleaner extends 
ControllerPeriodicTask<Void> {
       int port = portOverride > 0 ? portOverride : broker.getPort();
       brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol, 
broker.getHost(), port));
     }
-    LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
-    Map<String, String> strResponseMap = getResponseMap(requestHeaders, 
brokerUrls, "GET", HttpGet::new);
-    return 
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
e -> {
-      try {
-        return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
-        });
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }));
-  }
+    LOGGER.debug("Getting stored responses via broker urls: {}", brokerUrls);
 
-  private <T extends HttpUriRequestBase> Map<String, String> 
getResponseMap(Map<String, String> requestHeaders,
-      List<String> brokerUrls, String methodName, Function<String, T> 
httpRequestBaseSupplier)
-      throws Exception {
     List<Pair<String, String>> urlsAndRequestBodies = new 
ArrayList<>(brokerUrls.size());
     brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
 
     CompletionService<MultiHttpRequestResponse> completionService =
         new MultiHttpRequest(_executor, 
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
-            ResponseStoreCleaner.TIMEOUT_MS, methodName, 
httpRequestBaseSupplier);
-    Map<String, String> responseMap = new HashMap<>();
-    List<String> errMessages = new ArrayList<>(brokerUrls.size());
+            GET_TIMEOUT_MS, "GET", HttpGet::new);
+
+    Map<String, List<CursorResponseNative>> responseMap = new HashMap<>();
+    List<String> errMessages = new ArrayList<>();
+
     for (int i = 0; i < brokerUrls.size(); i++) {
       try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
-        // The completion order is different from brokerUrls, thus use uri in 
the response.
         URI uri = httpRequestResponse.getURI();
         int status = httpRequestResponse.getResponse().getCode();
         String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
-        // Unexpected server responses are collected and returned as exception.
+
         if (status != 200) {
-          throw new Exception(
-              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+          errMessages.add(String.format("Unexpected status=%d from uri='%s', 
response='%s'",
+              status, uri, responseString));
+          continue;
+        }
+
+        String brokerKey = getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort()));
+        try {
+          List<CursorResponseNative> responses = 
JsonUtils.stringToObject(responseString, new TypeReference<>() {
+          });
+          responseMap.put(brokerKey, responses);
+          LOGGER.debug("Got {} stored responses from broker: {}", 
responses.size(), brokerKey);
+        } catch (IOException ex) {
+          LOGGER.error("Failed to parse response from broker: {}", brokerKey, 
ex);
+          errMessages.add(String.format("Failed to parse response from broker 
'%s': %s", brokerKey, ex.getMessage()));
         }
-        responseMap.put((getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort()))), responseString);
       } catch (Exception e) {
-        LOGGER.error("Failed to execute {} op. ", methodName, e);
-        // Can't just throw exception from here as there is a need to release 
the other connections.
-        // So just collect the error msg to throw them together after the 
for-loop.
+        LOGGER.error("Failed to execute GET op", e);
         errMessages.add(e.getMessage());
       }
     }
+
     if (!errMessages.isEmpty()) {
-      throw new Exception("Unexpected responses from brokers: " + 
StringUtils.join(errMessages, ","));
+      LOGGER.warn("Some brokers failed to respond: {}", errMessages);
+      // Only throw if ALL brokers failed - allow partial success
+      if (responseMap.isEmpty()) {
+        throw new RuntimeException("All brokers failed to respond: " + 
StringUtils.join(errMessages, ", "));
+      }
     }
+
     return responseMap;
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
new file mode 100644
index 00000000000..8fb072554cb
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class ResponseStoreCleanerTest {
+  private static final String RESPONSE_STORE_PATH = "/responseStore";
+  private static final long CURRENT_TIME_MS = 1000000L;
+
+  private final Executor _executor = Executors.newFixedThreadPool(4);
+  private final PoolingHttpClientConnectionManager _connectionManager = new 
PoolingHttpClientConnectionManager();
+  private final ControllerMetrics _controllerMetrics =
+      new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+
+  private PinotHelixResourceManager _helixResourceManager;
+  private LeadControllerManager _leadControllerManager;
+  private ControllerConf _controllerConf;
+
+  private final List<FakeBrokerServer> _brokerServers = new ArrayList<>();
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    _helixResourceManager = mock(PinotHelixResourceManager.class);
+    _leadControllerManager = mock(LeadControllerManager.class);
+    _controllerConf = new ControllerConf();
+
+    
when(_leadControllerManager.isLeaderForTable(org.mockito.ArgumentMatchers.anyString())).thenReturn(true);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (FakeBrokerServer server : _brokerServers) {
+      server.stop();
+    }
+    _brokerServers.clear();
+    _connectionManager.close();
+  }
+
+  private FakeBrokerServer createBrokerServer(List<CursorResponseNative> 
responses, int deleteStatusCode)
+      throws IOException {
+    FakeBrokerServer server = new FakeBrokerServer(responses, 
deleteStatusCode);
+    server.start();
+    _brokerServers.add(server);
+    return server;
+  }
+
+  private void setupBrokerInstances(List<FakeBrokerServer> servers) {
+    List<InstanceConfig> brokerConfigs = new ArrayList<>();
+    for (FakeBrokerServer server : servers) {
+      ZNRecord record = new ZNRecord("Broker_" + server.getHost() + "_" + 
server.getPort());
+      
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_HOST.name(), 
server.getHost());
+      
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.name(),
+          String.valueOf(server.getPort()));
+      record.setSimpleField("GRPC_PORT", "8090");
+      InstanceConfig instanceConfig = new InstanceConfig(record);
+      brokerConfigs.add(instanceConfig);
+    }
+    
when(_helixResourceManager.getAllBrokerInstanceConfigs()).thenReturn(brokerConfigs);
+  }
+
+  private CursorResponseNative createCursorResponse(String requestId, long 
expirationTimeMs) {
+    CursorResponseNative response = new CursorResponseNative();
+    response.setRequestId(requestId);
+    response.setExpirationTimeMs(expirationTimeMs);
+    response.setBrokerHost("localhost");
+    response.setBrokerPort(8099);
+    return response;
+  }
+
+  @Test
+  public void testCleanupExpiredResponses()
+      throws Exception {
+    // Create broker with 2 expired and 1 non-expired response
+    List<CursorResponseNative> responses = new ArrayList<>();
+    responses.add(createCursorResponse("expired-1", CURRENT_TIME_MS - 1000)); 
// expired
+    responses.add(createCursorResponse("expired-2", CURRENT_TIME_MS - 500));  
// expired
+    responses.add(createCursorResponse("not-expired", CURRENT_TIME_MS + 
1000)); // not expired
+
+    FakeBrokerServer broker = createBrokerServer(responses, 200);
+    setupBrokerInstances(Collections.singletonList(broker));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // Verify only expired responses were deleted
+    assertEquals(broker.getDeleteCount(), 2);
+    assertTrue(broker.getDeletedRequestIds().containsKey("expired-1"));
+    assertTrue(broker.getDeletedRequestIds().containsKey("expired-2"));
+    assertEquals(broker.getDeletedRequestIds().containsKey("not-expired"), 
false);
+  }
+
+  @Test
+  public void testCleanupTreats404AsSuccess()
+      throws Exception {
+    // Create broker that returns 404 for deletes (simulating already-deleted 
responses)
+    List<CursorResponseNative> responses = new ArrayList<>();
+    responses.add(createCursorResponse("already-deleted", CURRENT_TIME_MS - 
1000));
+
+    FakeBrokerServer broker = createBrokerServer(responses, 404);
+    setupBrokerInstances(Collections.singletonList(broker));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    // This should not throw an exception even though broker returns 404
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // Verify delete was attempted
+    assertEquals(broker.getDeleteCount(), 1);
+  }
+
+  @Test
+  public void testCleanupWithMultipleBrokers()
+      throws Exception {
+    // Create two brokers with different expired responses
+    List<CursorResponseNative> broker1Responses = new ArrayList<>();
+    broker1Responses.add(createCursorResponse("broker1-expired-1", 
CURRENT_TIME_MS - 1000));
+    broker1Responses.add(createCursorResponse("broker1-expired-2", 
CURRENT_TIME_MS - 500));
+
+    List<CursorResponseNative> broker2Responses = new ArrayList<>();
+    broker2Responses.add(createCursorResponse("broker2-expired-1", 
CURRENT_TIME_MS - 2000));
+
+    FakeBrokerServer broker1 = createBrokerServer(broker1Responses, 200);
+    FakeBrokerServer broker2 = createBrokerServer(broker2Responses, 200);
+    setupBrokerInstances(List.of(broker1, broker2));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // Verify broker1 only received its own delete requests
+    assertEquals(broker1.getDeleteCount(), 2);
+    
assertTrue(broker1.getDeletedRequestIds().containsKey("broker1-expired-1"));
+    
assertTrue(broker1.getDeletedRequestIds().containsKey("broker1-expired-2"));
+    
assertEquals(broker1.getDeletedRequestIds().containsKey("broker2-expired-1"), 
false);
+
+    // Verify broker2 only received its own delete requests
+    assertEquals(broker2.getDeleteCount(), 1);
+    
assertTrue(broker2.getDeletedRequestIds().containsKey("broker2-expired-1"));
+    
assertEquals(broker2.getDeletedRequestIds().containsKey("broker1-expired-1"), 
false);
+  }
+
+  @Test
+  public void testPartialBrokerFailureDoesNotBlockOthers()
+      throws Exception {
+    // Create one broker that fails (returns 500) and one that succeeds
+    List<CursorResponseNative> broker1Responses = new ArrayList<>();
+    broker1Responses.add(createCursorResponse("broker1-expired", 
CURRENT_TIME_MS - 1000));
+
+    List<CursorResponseNative> broker2Responses = new ArrayList<>();
+    broker2Responses.add(createCursorResponse("broker2-expired", 
CURRENT_TIME_MS - 1000));
+
+    FakeBrokerServer broker1 = createBrokerServer(broker1Responses, 500); // 
Will fail
+    FakeBrokerServer broker2 = createBrokerServer(broker2Responses, 200); // 
Will succeed
+    setupBrokerInstances(List.of(broker1, broker2));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    // Should not throw, even though broker1 fails
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // Both brokers should have received delete attempts
+    assertEquals(broker1.getDeleteCount(), 1);
+    assertEquals(broker2.getDeleteCount(), 1);
+  }
+
+  @Test
+  public void testNoExpiredResponses()
+      throws Exception {
+    // Create broker with only non-expired responses
+    List<CursorResponseNative> responses = new ArrayList<>();
+    responses.add(createCursorResponse("not-expired-1", CURRENT_TIME_MS + 
1000));
+    responses.add(createCursorResponse("not-expired-2", CURRENT_TIME_MS + 
2000));
+
+    FakeBrokerServer broker = createBrokerServer(responses, 200);
+    setupBrokerInstances(Collections.singletonList(broker));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // No deletes should have been attempted
+    assertEquals(broker.getDeleteCount(), 0);
+  }
+
+  @Test
+  public void testEmptyResponseStore()
+      throws Exception {
+    // Create broker with no responses
+    FakeBrokerServer broker = createBrokerServer(Collections.emptyList(), 200);
+    setupBrokerInstances(Collections.singletonList(broker));
+
+    ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+        _controllerConf, _helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executor, _connectionManager);
+
+    cleaner.doClean(CURRENT_TIME_MS);
+
+    // No deletes should have been attempted
+    assertEquals(broker.getDeleteCount(), 0);
+  }
+
+  /**
+   * Fake broker server that simulates the ResponseStoreResource endpoints.
+   */
+  private static class FakeBrokerServer extends FakeHttpServer {
+    private final List<CursorResponseNative> _responses;
+    private final int _deleteStatusCode;
+    private final AtomicInteger _deleteCount = new AtomicInteger(0);
+    private final Map<String, Boolean> _deletedRequestIds = new 
ConcurrentHashMap<>();
+
+    FakeBrokerServer(List<CursorResponseNative> responses, int 
deleteStatusCode) {
+      _responses = responses;
+      _deleteStatusCode = deleteStatusCode;
+    }
+
+    void start()
+        throws IOException {
+      HttpHandler handler = new HttpHandler() {
+        @Override
+        public void handle(HttpExchange exchange)
+            throws IOException {
+          String method = exchange.getRequestMethod();
+          URI uri = exchange.getRequestURI();
+          String path = uri.getPath();
+
+          String responseBody;
+          int statusCode;
+
+          if ("GET".equals(method) && RESPONSE_STORE_PATH.equals(path)) {
+            // GET /responseStore - return list of responses
+            responseBody = JsonUtils.objectToString(_responses);
+            statusCode = 200;
+          } else if ("DELETE".equals(method) && 
path.startsWith(RESPONSE_STORE_PATH + "/")) {
+            // DELETE /responseStore/{requestId}
+            String requestId = path.substring(RESPONSE_STORE_PATH.length() + 
1);
+            _deleteCount.incrementAndGet();
+            _deletedRequestIds.put(requestId, true);
+            responseBody = "Deleted " + requestId;
+            statusCode = _deleteStatusCode;
+          } else {
+            responseBody = "Not found";
+            statusCode = 404;
+          }
+
+          byte[] responseBytes = responseBody.getBytes();
+          exchange.sendResponseHeaders(statusCode, responseBytes.length);
+          try (OutputStream os = exchange.getResponseBody()) {
+            os.write(responseBytes);
+          }
+        }
+      };
+
+      super.start(RESPONSE_STORE_PATH, handler);
+    }
+
+    String getHost() {
+      return "localhost";
+    }
+
+    int getPort() {
+      return _httpServer.getAddress().getPort();
+    }
+
+    int getDeleteCount() {
+      return _deleteCount.get();
+    }
+
+    Map<String, Boolean> getDeletedRequestIds() {
+      return _deletedRequestIds;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to