This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3f5466035ac Fix ResponseStoreCleaner bugs causing unbounded response
store growth and cleanup failures (#17622)
3f5466035ac is described below
commit 3f5466035ac21683f4cbdb2c3bb710f815bb8476
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 5 11:02:55 2026 -0800
Fix ResponseStoreCleaner bugs causing unbounded response store growth and
cleanup failures (#17622)
---
.../controller/cursors/ResponseStoreCleaner.java | 164 +++++++---
.../cursors/ResponseStoreCleanerTest.java | 333 +++++++++++++++++++++
2 files changed, 452 insertions(+), 45 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
index e2a5fc26465..b7bd65667d5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
@@ -29,13 +29,10 @@ import java.util.Properties;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
-import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.helix.model.InstanceConfig;
@@ -66,7 +63,9 @@ import org.slf4j.LoggerFactory;
*/
public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ResponseStoreCleaner.class);
- private static final int TIMEOUT_MS = 3000;
+ // Increased timeout to handle large response stores
+ private static final int GET_TIMEOUT_MS = 60_000;
+ private static final int DELETE_TIMEOUT_MS = 10_000;
private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
private static final String DELETE_QUERY_RESULT =
"%s://%s:%d/responseStore/%s";
// Used in tests to trigger the delete instead of waiting for the wall clock
to move to an appropriate time.
@@ -132,32 +131,104 @@ public class ResponseStoreCleaner extends
ControllerPeriodicTask<Void> {
Integer.parseInt(HelixHelper.getGrpcPort(broker))));
}
+ Map<String, String> requestHeaders;
try {
- Map<String, String> requestHeaders =
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+ requestHeaders = AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+ } catch (Exception e) {
+ LOGGER.error("Failed to create auth headers for response store cleanup",
e);
+ return;
+ }
- Map<String, List<CursorResponseNative>> brokerCursorsMap =
getAllQueryResults(brokers, requestHeaders);
+ Map<String, List<CursorResponseNative>> brokerCursorsMap;
+ try {
+ brokerCursorsMap = getAllQueryResults(brokers, requestHeaders);
+ } catch (Exception e) {
+ LOGGER.error("Failed to get query results from brokers for cleanup", e);
+ return;
+ }
+
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int portOverride = _controllerConf.getControllerBrokerPortOverride();
- String protocol = _controllerConf.getControllerBrokerProtocol();
- int portOverride = _controllerConf.getControllerBrokerPortOverride();
+ // Process each broker independently to ensure partial failures don't
block cleanup of other brokers
+ for (Map.Entry<String, List<CursorResponseNative>> entry :
brokerCursorsMap.entrySet()) {
+ String brokerKey = entry.getKey();
+ InstanceInfo broker = brokers.get(brokerKey);
+ // Collect URLs for expired responses for THIS broker only
List<String> brokerUrls = new ArrayList<>();
- for (Map.Entry<String, List<CursorResponseNative>> entry :
brokerCursorsMap.entrySet()) {
- for (CursorResponse response : entry.getValue()) {
- if (response.getExpirationTimeMs() <= currentTime) {
- InstanceInfo broker = brokers.get(entry.getKey());
- int port = portOverride > 0 ? portOverride : broker.getPort();
- brokerUrls.add(
- String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(),
port, response.getRequestId()));
- }
+ for (CursorResponse response : entry.getValue()) {
+ if (response.getExpirationTimeMs() <= currentTime) {
+ int port = portOverride > 0 ? portOverride : broker.getPort();
+ brokerUrls.add(
+ String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(),
port, response.getRequestId()));
}
- Map<String, String> deleteStatus = getResponseMap(requestHeaders,
brokerUrls, "DELETE", HttpDelete::new);
+ }
+
+ if (brokerUrls.isEmpty()) {
+ LOGGER.debug("No expired responses to clean up for broker: {}",
brokerKey);
+ continue;
+ }
+ LOGGER.info("Cleaning up {} expired responses from broker: {}",
brokerUrls.size(), brokerKey);
+
+ try {
+ Map<String, String> deleteStatus =
+ deleteExpiredResponses(requestHeaders, brokerUrls);
deleteStatus.forEach(
(key, value) -> LOGGER.info("ResponseStore delete response -
Broker: {}. Response: {}", key, value));
+ } catch (Exception e) {
+ // Log error but continue with other brokers - don't let one broker
failure block cleanup of others
+ LOGGER.error("Failed to delete expired responses from broker: {}. Will
retry on next cleanup cycle.",
+ brokerKey, e);
+ }
+ }
+ }
+
+ /**
+ * Delete expired responses from brokers. Treats 404 responses as success
since the goal
+ * is to ensure the response doesn't exist (idempotent delete).
+ */
+ private Map<String, String> deleteExpiredResponses(Map<String, String>
requestHeaders, List<String> brokerUrls)
+ throws Exception {
+ List<Pair<String, String>> urlsAndRequestBodies = new
ArrayList<>(brokerUrls.size());
+ brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(_executor,
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+ DELETE_TIMEOUT_MS, "DELETE", HttpDelete::new);
+
+ Map<String, String> responseMap = new HashMap<>();
+ List<String> errMessages = new ArrayList<>();
+
+ for (int i = 0; i < brokerUrls.size(); i++) {
+ try (MultiHttpRequestResponse httpRequestResponse =
completionService.take().get()) {
+ URI uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+
+ if (status == 200) {
+ responseMap.put(getInstanceKey(uri.getHost(),
Integer.toString(uri.getPort())), responseString);
+ } else if (status == 404) {
+ // 404 means the response is already deleted - this is acceptable
for idempotent cleanup
+ LOGGER.debug("Response already deleted (404) for uri: {}", uri);
+ responseMap.put(getInstanceKey(uri.getHost(),
Integer.toString(uri.getPort())),
+ "Already deleted (was 404)");
+ } else {
+ // Other errors are unexpected and should be logged
+ LOGGER.warn("Unexpected status={} from uri='{}', response='{}'",
status, uri, responseString);
+ errMessages.add(String.format("Unexpected status=%d from uri='%s'",
status, uri));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to execute DELETE op", e);
+ errMessages.add(e.getMessage());
}
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
}
+
+ if (!errMessages.isEmpty()) {
+ throw new RuntimeException("Some delete operations failed: " +
StringUtils.join(errMessages, ", "));
+ }
+ return responseMap;
}
private Map<String, List<CursorResponseNative>>
getAllQueryResults(Map<String, InstanceInfo> brokers,
@@ -170,51 +241,54 @@ public class ResponseStoreCleaner extends
ControllerPeriodicTask<Void> {
int port = portOverride > 0 ? portOverride : broker.getPort();
brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol,
broker.getHost(), port));
}
- LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
- Map<String, String> strResponseMap = getResponseMap(requestHeaders,
brokerUrls, "GET", HttpGet::new);
- return
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> {
- try {
- return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
- });
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }));
- }
+ LOGGER.debug("Getting stored responses via broker urls: {}", brokerUrls);
- private <T extends HttpUriRequestBase> Map<String, String>
getResponseMap(Map<String, String> requestHeaders,
- List<String> brokerUrls, String methodName, Function<String, T>
httpRequestBaseSupplier)
- throws Exception {
List<Pair<String, String>> urlsAndRequestBodies = new
ArrayList<>(brokerUrls.size());
brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
CompletionService<MultiHttpRequestResponse> completionService =
new MultiHttpRequest(_executor,
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
- ResponseStoreCleaner.TIMEOUT_MS, methodName,
httpRequestBaseSupplier);
- Map<String, String> responseMap = new HashMap<>();
- List<String> errMessages = new ArrayList<>(brokerUrls.size());
+ GET_TIMEOUT_MS, "GET", HttpGet::new);
+
+ Map<String, List<CursorResponseNative>> responseMap = new HashMap<>();
+ List<String> errMessages = new ArrayList<>();
+
for (int i = 0; i < brokerUrls.size(); i++) {
try (MultiHttpRequestResponse httpRequestResponse =
completionService.take().get()) {
- // The completion order is different from brokerUrls, thus use uri in
the response.
URI uri = httpRequestResponse.getURI();
int status = httpRequestResponse.getResponse().getCode();
String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
- // Unexpected server responses are collected and returned as exception.
+
if (status != 200) {
- throw new Exception(
- String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ errMessages.add(String.format("Unexpected status=%d from uri='%s',
response='%s'",
+ status, uri, responseString));
+ continue;
+ }
+
+ String brokerKey = getInstanceKey(uri.getHost(),
Integer.toString(uri.getPort()));
+ try {
+ List<CursorResponseNative> responses =
JsonUtils.stringToObject(responseString, new TypeReference<>() {
+ });
+ responseMap.put(brokerKey, responses);
+ LOGGER.debug("Got {} stored responses from broker: {}",
responses.size(), brokerKey);
+ } catch (IOException ex) {
+ LOGGER.error("Failed to parse response from broker: {}", brokerKey,
ex);
+ errMessages.add(String.format("Failed to parse response from broker
'%s': %s", brokerKey, ex.getMessage()));
}
- responseMap.put((getInstanceKey(uri.getHost(),
Integer.toString(uri.getPort()))), responseString);
} catch (Exception e) {
- LOGGER.error("Failed to execute {} op. ", methodName, e);
- // Can't just throw exception from here as there is a need to release
the other connections.
- // So just collect the error msg to throw them together after the
for-loop.
+ LOGGER.error("Failed to execute GET op", e);
errMessages.add(e.getMessage());
}
}
+
if (!errMessages.isEmpty()) {
- throw new Exception("Unexpected responses from brokers: " +
StringUtils.join(errMessages, ","));
+ LOGGER.warn("Some brokers failed to respond: {}", errMessages);
+ // Only throw if ALL brokers failed - allow partial success
+ if (responseMap.isEmpty()) {
+ throw new RuntimeException("All brokers failed to respond: " +
StringUtils.join(errMessages, ", "));
+ }
}
+
return responseMap;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
new file mode 100644
index 00000000000..8fb072554cb
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/cursors/ResponseStoreCleanerTest.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class ResponseStoreCleanerTest {
+ private static final String RESPONSE_STORE_PATH = "/responseStore";
+ private static final long CURRENT_TIME_MS = 1000000L;
+
+ private final Executor _executor = Executors.newFixedThreadPool(4);
+ private final PoolingHttpClientConnectionManager _connectionManager = new
PoolingHttpClientConnectionManager();
+ private final ControllerMetrics _controllerMetrics =
+ new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+
+ private PinotHelixResourceManager _helixResourceManager;
+ private LeadControllerManager _leadControllerManager;
+ private ControllerConf _controllerConf;
+
+ private final List<FakeBrokerServer> _brokerServers = new ArrayList<>();
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ _helixResourceManager = mock(PinotHelixResourceManager.class);
+ _leadControllerManager = mock(LeadControllerManager.class);
+ _controllerConf = new ControllerConf();
+
+
when(_leadControllerManager.isLeaderForTable(org.mockito.ArgumentMatchers.anyString())).thenReturn(true);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (FakeBrokerServer server : _brokerServers) {
+ server.stop();
+ }
+ _brokerServers.clear();
+ _connectionManager.close();
+ }
+
+ private FakeBrokerServer createBrokerServer(List<CursorResponseNative>
responses, int deleteStatusCode)
+ throws IOException {
+ FakeBrokerServer server = new FakeBrokerServer(responses,
deleteStatusCode);
+ server.start();
+ _brokerServers.add(server);
+ return server;
+ }
+
+ private void setupBrokerInstances(List<FakeBrokerServer> servers) {
+ List<InstanceConfig> brokerConfigs = new ArrayList<>();
+ for (FakeBrokerServer server : servers) {
+ ZNRecord record = new ZNRecord("Broker_" + server.getHost() + "_" +
server.getPort());
+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_HOST.name(),
server.getHost());
+
record.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.name(),
+ String.valueOf(server.getPort()));
+ record.setSimpleField("GRPC_PORT", "8090");
+ InstanceConfig instanceConfig = new InstanceConfig(record);
+ brokerConfigs.add(instanceConfig);
+ }
+
when(_helixResourceManager.getAllBrokerInstanceConfigs()).thenReturn(brokerConfigs);
+ }
+
+ private CursorResponseNative createCursorResponse(String requestId, long
expirationTimeMs) {
+ CursorResponseNative response = new CursorResponseNative();
+ response.setRequestId(requestId);
+ response.setExpirationTimeMs(expirationTimeMs);
+ response.setBrokerHost("localhost");
+ response.setBrokerPort(8099);
+ return response;
+ }
+
+ @Test
+ public void testCleanupExpiredResponses()
+ throws Exception {
+ // Create broker with 2 expired and 1 non-expired response
+ List<CursorResponseNative> responses = new ArrayList<>();
+ responses.add(createCursorResponse("expired-1", CURRENT_TIME_MS - 1000));
// expired
+ responses.add(createCursorResponse("expired-2", CURRENT_TIME_MS - 500));
// expired
+ responses.add(createCursorResponse("not-expired", CURRENT_TIME_MS +
1000)); // not expired
+
+ FakeBrokerServer broker = createBrokerServer(responses, 200);
+ setupBrokerInstances(Collections.singletonList(broker));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // Verify only expired responses were deleted
+ assertEquals(broker.getDeleteCount(), 2);
+ assertTrue(broker.getDeletedRequestIds().containsKey("expired-1"));
+ assertTrue(broker.getDeletedRequestIds().containsKey("expired-2"));
+ assertEquals(broker.getDeletedRequestIds().containsKey("not-expired"),
false);
+ }
+
+ @Test
+ public void testCleanupTreats404AsSuccess()
+ throws Exception {
+ // Create broker that returns 404 for deletes (simulating already-deleted
responses)
+ List<CursorResponseNative> responses = new ArrayList<>();
+ responses.add(createCursorResponse("already-deleted", CURRENT_TIME_MS -
1000));
+
+ FakeBrokerServer broker = createBrokerServer(responses, 404);
+ setupBrokerInstances(Collections.singletonList(broker));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ // This should not throw an exception even though broker returns 404
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // Verify delete was attempted
+ assertEquals(broker.getDeleteCount(), 1);
+ }
+
+ @Test
+ public void testCleanupWithMultipleBrokers()
+ throws Exception {
+ // Create two brokers with different expired responses
+ List<CursorResponseNative> broker1Responses = new ArrayList<>();
+ broker1Responses.add(createCursorResponse("broker1-expired-1",
CURRENT_TIME_MS - 1000));
+ broker1Responses.add(createCursorResponse("broker1-expired-2",
CURRENT_TIME_MS - 500));
+
+ List<CursorResponseNative> broker2Responses = new ArrayList<>();
+ broker2Responses.add(createCursorResponse("broker2-expired-1",
CURRENT_TIME_MS - 2000));
+
+ FakeBrokerServer broker1 = createBrokerServer(broker1Responses, 200);
+ FakeBrokerServer broker2 = createBrokerServer(broker2Responses, 200);
+ setupBrokerInstances(List.of(broker1, broker2));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // Verify broker1 only received its own delete requests
+ assertEquals(broker1.getDeleteCount(), 2);
+
assertTrue(broker1.getDeletedRequestIds().containsKey("broker1-expired-1"));
+
assertTrue(broker1.getDeletedRequestIds().containsKey("broker1-expired-2"));
+
assertEquals(broker1.getDeletedRequestIds().containsKey("broker2-expired-1"),
false);
+
+ // Verify broker2 only received its own delete requests
+ assertEquals(broker2.getDeleteCount(), 1);
+
assertTrue(broker2.getDeletedRequestIds().containsKey("broker2-expired-1"));
+
assertEquals(broker2.getDeletedRequestIds().containsKey("broker1-expired-1"),
false);
+ }
+
+ @Test
+ public void testPartialBrokerFailureDoesNotBlockOthers()
+ throws Exception {
+ // Create one broker that fails (returns 500) and one that succeeds
+ List<CursorResponseNative> broker1Responses = new ArrayList<>();
+ broker1Responses.add(createCursorResponse("broker1-expired",
CURRENT_TIME_MS - 1000));
+
+ List<CursorResponseNative> broker2Responses = new ArrayList<>();
+ broker2Responses.add(createCursorResponse("broker2-expired",
CURRENT_TIME_MS - 1000));
+
+ FakeBrokerServer broker1 = createBrokerServer(broker1Responses, 500); //
Will fail
+ FakeBrokerServer broker2 = createBrokerServer(broker2Responses, 200); //
Will succeed
+ setupBrokerInstances(List.of(broker1, broker2));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ // Should not throw, even though broker1 fails
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // Both brokers should have received delete attempts
+ assertEquals(broker1.getDeleteCount(), 1);
+ assertEquals(broker2.getDeleteCount(), 1);
+ }
+
+ @Test
+ public void testNoExpiredResponses()
+ throws Exception {
+ // Create broker with only non-expired responses
+ List<CursorResponseNative> responses = new ArrayList<>();
+ responses.add(createCursorResponse("not-expired-1", CURRENT_TIME_MS +
1000));
+ responses.add(createCursorResponse("not-expired-2", CURRENT_TIME_MS +
2000));
+
+ FakeBrokerServer broker = createBrokerServer(responses, 200);
+ setupBrokerInstances(Collections.singletonList(broker));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // No deletes should have been attempted
+ assertEquals(broker.getDeleteCount(), 0);
+ }
+
+ @Test
+ public void testEmptyResponseStore()
+ throws Exception {
+ // Create broker with no responses
+ FakeBrokerServer broker = createBrokerServer(Collections.emptyList(), 200);
+ setupBrokerInstances(Collections.singletonList(broker));
+
+ ResponseStoreCleaner cleaner = new ResponseStoreCleaner(
+ _controllerConf, _helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executor, _connectionManager);
+
+ cleaner.doClean(CURRENT_TIME_MS);
+
+ // No deletes should have been attempted
+ assertEquals(broker.getDeleteCount(), 0);
+ }
+
+ /**
+ * Fake broker server that simulates the ResponseStoreResource endpoints.
+ */
+ private static class FakeBrokerServer extends FakeHttpServer {
+ private final List<CursorResponseNative> _responses;
+ private final int _deleteStatusCode;
+ private final AtomicInteger _deleteCount = new AtomicInteger(0);
+ private final Map<String, Boolean> _deletedRequestIds = new
ConcurrentHashMap<>();
+
+ FakeBrokerServer(List<CursorResponseNative> responses, int
deleteStatusCode) {
+ _responses = responses;
+ _deleteStatusCode = deleteStatusCode;
+ }
+
+ void start()
+ throws IOException {
+ HttpHandler handler = new HttpHandler() {
+ @Override
+ public void handle(HttpExchange exchange)
+ throws IOException {
+ String method = exchange.getRequestMethod();
+ URI uri = exchange.getRequestURI();
+ String path = uri.getPath();
+
+ String responseBody;
+ int statusCode;
+
+ if ("GET".equals(method) && RESPONSE_STORE_PATH.equals(path)) {
+ // GET /responseStore - return list of responses
+ responseBody = JsonUtils.objectToString(_responses);
+ statusCode = 200;
+ } else if ("DELETE".equals(method) &&
path.startsWith(RESPONSE_STORE_PATH + "/")) {
+ // DELETE /responseStore/{requestId}
+ String requestId = path.substring(RESPONSE_STORE_PATH.length() +
1);
+ _deleteCount.incrementAndGet();
+ _deletedRequestIds.put(requestId, true);
+ responseBody = "Deleted " + requestId;
+ statusCode = _deleteStatusCode;
+ } else {
+ responseBody = "Not found";
+ statusCode = 404;
+ }
+
+ byte[] responseBytes = responseBody.getBytes();
+ exchange.sendResponseHeaders(statusCode, responseBytes.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(responseBytes);
+ }
+ }
+ };
+
+ super.start(RESPONSE_STORE_PATH, handler);
+ }
+
+ String getHost() {
+ return "localhost";
+ }
+
+ int getPort() {
+ return _httpServer.getAddress().getPort();
+ }
+
+ int getDeleteCount() {
+ return _deleteCount.get();
+ }
+
+ Map<String, Boolean> getDeletedRequestIds() {
+ return _deletedRequestIds;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]