This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ca0dfc10dae Fix compatibility stream count retries (#18532)
ca0dfc10dae is described below
commit ca0dfc10dae24a5f567209c990ffc94081100630
Author: Xiang Fu <[email protected]>
AuthorDate: Tue May 19 20:56:31 2026 -0700
Fix compatibility stream count retries (#18532)
The compatibility verifier polls SELECT count(*) while rolling brokers and
servers between versions. During those transitions Pinot can return transient
broker/server errors or partial count responses before routing and query
scheduling stabilize.
Retry only those verifier polling failures, keep non-retryable query errors
fail-fast, and cover the retry loop with focused StreamOp tests.
---
.../java/org/apache/pinot/compat/StreamOp.java | 138 +++++++++++++---
.../main/java/org/apache/pinot/compat/Utils.java | 5 +-
.../java/org/apache/pinot/compat/StreamOpTest.java | 174 +++++++++++++++++++++
3 files changed, 294 insertions(+), 23 deletions(-)
diff --git
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
index 941abcf6afa..b58987a5310 100644
---
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
+++
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
+import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@@ -32,6 +33,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongPredicate;
+import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -52,7 +55,6 @@ import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,7 +94,21 @@ public class StreamOp extends BaseOp {
private static final String NUM_SERVERS_QUERIED = "numServersQueried";
private static final String NUM_SERVERS_RESPONEDED = "numServersResponded";
private static final String TOTAL_DOCS = "totalDocs";
+ private static final long COUNT_QUERY_TIMEOUT_MS = 60_000L;
+ private static final long COUNT_QUERY_RETRY_INTERVAL_MS = 100L;
private static final short KAFKA_REPLICATION_FACTOR = 1;
+ private static final Set<Integer> RETRYABLE_QUERY_ERROR_CODES = Set.of(
+ QueryErrorCode.BROKER_INSTANCE_MISSING.getId(),
+ QueryErrorCode.BROKER_RESOURCE_MISSING.getId(),
+ QueryErrorCode.BROKER_REQUEST_SEND.getId(),
+ QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE.getId(),
+ QueryErrorCode.BROKER_TIMEOUT.getId(),
+ QueryErrorCode.EXECUTION_TIMEOUT.getId(),
+ QueryErrorCode.QUERY_SCHEDULING_TIMEOUT.getId(),
+ QueryErrorCode.SERVER_SHUTTING_DOWN.getId(),
+ QueryErrorCode.SERVER_NOT_RESPONDING.getId(),
+ QueryErrorCode.SERVER_SEGMENT_MISSING.getId(),
+ QueryErrorCode.SERVER_TABLE_MISSING.getId());
public StreamOp() {
super(OpType.STREAM_OP);
@@ -195,7 +211,7 @@ public class StreamOp extends BaseOp {
long existingTotalDoc = 0;
// get original rows
- existingTotalDoc = fetchExistingTotalDocs(tableName);
+ existingTotalDoc = waitForExistingTotalDocs(tableName,
COUNT_QUERY_TIMEOUT_MS);
// push csv data to kafka
Properties publisherProps = new Properties();
@@ -254,7 +270,7 @@ public class StreamOp extends BaseOp {
}
// verify number of rows increases as expected
- waitForDocsLoaded(tableName, existingTotalDoc + _numRows, 60_000L);
+ waitForDocsLoaded(tableName, existingTotalDoc + _numRows,
COUNT_QUERY_TIMEOUT_MS);
LOGGER.info("Verified {} new rows in table: {}", _numRows, tableName);
return true;
} catch (Exception e) {
@@ -266,21 +282,27 @@ public class StreamOp extends BaseOp {
private long fetchExistingTotalDocs(String tableName)
throws Exception {
String query = "SELECT count(*) FROM " + tableName;
- JsonNode response = Utils.postSqlQuery(query,
ClusterDescriptor.getInstance().getBrokerUrl());
+ JsonNode response;
+ try {
+ response = Utils.postSqlQuery(query,
ClusterDescriptor.getInstance().getBrokerUrl());
+ } catch (IOException e) {
+ throw new RetryableQueryException(String.format("Failed to query Table:
%s", tableName), e);
+ }
+ return extractTotalDocs(query, response);
+ }
+
+ static long extractTotalDocs(String query, JsonNode response)
+ throws RetryableQueryException {
if (response == null) {
- String errorMsg = String.format("Failed to query Table: %s", tableName);
- LOGGER.error(errorMsg);
- throw new RuntimeException(errorMsg);
+ throw new RetryableQueryException(String.format("Failed when running
query: %s; got null response", query));
}
if (response.has(EXCEPTIONS) && !response.get(EXCEPTIONS).isEmpty()) {
String errorMsg =
String.format("Failed when running query: '%s'; got
exceptions:\n%s\n", query, response.toPrettyString());
JsonNode exceptions = response.get(EXCEPTIONS);
- JsonNode errorCode = exceptions.get(ERROR_CODE);
- if (QueryErrorCode.BROKER_INSTANCE_MISSING.getId() == errorCode.asInt())
{
- LOGGER.warn("{}.Trying again", errorMsg);
- return 0;
+ if (hasOnlyRetryableQueryExceptions(exceptions)) {
+ throw new RetryableQueryException(errorMsg);
}
LOGGER.error(errorMsg);
throw new RuntimeException(errorMsg);
@@ -289,8 +311,7 @@ public class StreamOp extends BaseOp {
if (response.has(NUM_SERVERS_QUERIED) &&
response.has(NUM_SERVERS_RESPONEDED)
&& response.get(NUM_SERVERS_QUERIED).asInt() >
response.get(NUM_SERVERS_RESPONEDED).asInt()) {
String errorMsg = String.format("Failed when running query: %s; the
response contains partial results", query);
- LOGGER.error(errorMsg);
- throw new RuntimeException(errorMsg);
+ throw new RetryableQueryException(errorMsg);
}
if (!response.has(TOTAL_DOCS)) {
@@ -301,15 +322,90 @@ public class StreamOp extends BaseOp {
return response.get(TOTAL_DOCS).asLong();
}
- private void waitForDocsLoaded(String tableName, long targetDocs, long
timeoutMs) {
+ private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+ throws Exception {
+ AtomicLong loadedDocs = new AtomicLong(-1);
+ waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+ loadedDocs.set(existingTotalDocs);
+ return true;
+ }, () -> "Failed to fetch existing documents for table: " + tableName);
+ return loadedDocs.get();
+ }
+
+ private void waitForDocsLoaded(String tableName, long targetDocs, long
timeoutMs)
+ throws Exception {
LOGGER.info("Wait Doc to load ...");
AtomicLong loadedDocs = new AtomicLong(-1);
- TestUtils.waitForCondition(
- () -> {
- long existingTotalDocs = fetchExistingTotalDocs(tableName);
- loadedDocs.set(existingTotalDocs);
- return existingTotalDocs == targetDocs;
- }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents.
Found " + loadedDocs.get() + " instead",
- Duration.ofSeconds(1));
+ waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+ loadedDocs.set(existingTotalDocs);
+ return existingTotalDocs == targetDocs;
+ }, () -> "Failed to load " + targetDocs + " documents. Found " +
loadedDocs.get() + " instead");
+ }
+
+ private void waitForDocs(String tableName, long timeoutMs, LongPredicate
condition, Supplier<String> errorMessage)
+ throws Exception {
+ waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS,
condition, errorMessage,
+ () -> fetchExistingTotalDocs(tableName));
+ }
+
+ void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs,
LongPredicate condition,
+ Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+ throws Exception {
+ long endTimeMs = System.currentTimeMillis() + timeoutMs;
+ long nextLogTimeMs = 0;
+ RetryableQueryException lastRetryableQueryException = null;
+ while (System.currentTimeMillis() < endTimeMs) {
+ try {
+ if (condition.test(totalDocsSupplier.getAsLong())) {
+ return;
+ }
+ } catch (RetryableQueryException e) {
+ lastRetryableQueryException = e;
+ long currentTimeMs = System.currentTimeMillis();
+ if (currentTimeMs >= nextLogTimeMs) {
+ LOGGER.warn("Unable to fetch total docs for table: {}. Trying
again", tableName, e);
+ nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+ }
+ }
+ Thread.sleep(retryIntervalMs);
+ }
+ throw new RuntimeException(errorMessage.get(),
lastRetryableQueryException);
+ }
+
+ static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+ if (exceptions.isArray()) {
+ for (JsonNode exception : exceptions) {
+ if (!isRetryableQueryException(exception)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return isRetryableQueryException(exceptions);
+ }
+
+ private static boolean isRetryableQueryException(JsonNode exception) {
+ Integer errorCode = getErrorCode(exception);
+ return errorCode != null &&
RETRYABLE_QUERY_ERROR_CODES.contains(errorCode);
+ }
+
+ private static Integer getErrorCode(JsonNode exception) {
+ JsonNode errorCode = exception.get(ERROR_CODE);
+ return errorCode != null ? errorCode.asInt() : null;
+ }
+
+ interface TotalDocsSupplier {
+ long getAsLong()
+ throws Exception;
+ }
+
+ static class RetryableQueryException extends Exception {
+ RetryableQueryException(String message) {
+ super(message);
+ }
+
+ RetryableQueryException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
}
diff --git
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
index cf90c625b1f..969e3cd3820 100644
---
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
+++
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
@@ -51,7 +51,7 @@ public class Utils {
}
public static JsonNode postSqlQuery(String query, String brokerBaseApiUrl)
- throws Exception {
+ throws IOException {
ObjectNode payload = JsonUtils.newObjectNode();
payload.put("sql", query);
payload.put("queryOptions", "groupByMode=sql;responseFormat=sql");
@@ -60,7 +60,8 @@ public class Utils {
ControllerTest.sendPostRequest(brokerBaseApiUrl + "/query/sql",
payload.toString()));
}
- public static JsonNode postMultiStageSqlQuery(String query, String
brokerBaseApiUrl) throws Exception {
+ public static JsonNode postMultiStageSqlQuery(String query, String
brokerBaseApiUrl)
+ throws IOException {
ObjectNode payload = JsonUtils.newObjectNode();
payload.put("sql", query);
payload.put("queryOptions", "useMultistageEngine=true");
diff --git
a/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
b/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
new file mode 100644
index 00000000000..1855d3f1f3b
--- /dev/null
+++
b/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.compat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests retry classification for broker responses returned while
compatibility tests roll components.
+public class StreamOpTest {
+ private static final String QUERY = "SELECT count(*) FROM testTable";
+
+ @Test
+ public void testRetryableExceptionArrayWithNumericErrorCode()
+ throws Exception {
+ JsonNode response =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+ + QueryErrorCode.BROKER_INSTANCE_MISSING.getId() + ",\"message\":\"No
routing table\"}]}");
+
+
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+ }
+
+ @Test
+ public void testRetryableExceptionArrayWithBrokerSegmentUnavailableCode()
+ throws Exception {
+ JsonNode response =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+ + QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE.getId() +
",\"message\":\"Segment unavailable\"}]}");
+
+
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+ }
+
+ @Test
+ public void testRetryableExceptionArrayWithShutdownCode()
+ throws Exception {
+ JsonNode response =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+ + QueryErrorCode.SERVER_SHUTTING_DOWN.getId() + ",\"message\":\"Server
shutting down\"}]}");
+
+
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+ }
+
+ @Test
+ public void testNonRetryableExceptionArray()
+ throws Exception {
+ JsonNode response =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+ + QueryErrorCode.SQL_PARSING.getId() + ",\"message\":\"Bad
query\"}]}");
+
+
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+ }
+
+ @Test
+ public void testExceptionWithoutErrorCodeIsNotRetryable()
+ throws Exception {
+ JsonNode responseWithArray =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"code\":\""
+ + QueryErrorCode.BROKER_INSTANCE_MISSING.name() + "\",\"message\":\"No
routing table\"}]}");
+ JsonNode responseWithObject =
JsonUtils.stringToJsonNode("{\"exceptions\":{\"code\":\""
+ + QueryErrorCode.BROKER_INSTANCE_MISSING.name() + "\",\"message\":\"No
routing table\"}}");
+
+
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(responseWithArray.get("exceptions")));
+
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(responseWithObject.get("exceptions")));
+ RuntimeException thrown = expectThrows(RuntimeException.class,
+ () -> StreamOp.extractTotalDocs(QUERY, responseWithArray));
+
assertTrue(thrown.getMessage().contains(QueryErrorCode.BROKER_INSTANCE_MISSING.name()));
+ }
+
+ @Test
+ public void testMixedExceptionArrayIsNotRetryable()
+ throws Exception {
+ JsonNode response =
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+ + QueryErrorCode.BROKER_INSTANCE_MISSING.getId() + ",\"message\":\""
+ + QueryErrorCode.BROKER_INSTANCE_MISSING.getDefaultMessage() +
"\"},{\"errorCode\":"
+ + QueryErrorCode.SQL_PARSING.getId() + ",\"message\":\""
+ + QueryErrorCode.SQL_PARSING.getDefaultMessage() + "\"}]}");
+
+
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+ RuntimeException thrown = expectThrows(RuntimeException.class, () ->
StreamOp.extractTotalDocs(QUERY, response));
+
assertTrue(thrown.getMessage().contains(QueryErrorCode.SQL_PARSING.getDefaultMessage()));
+ }
+
+ @Test
+ public void testWaitForDocsRetriesTransientFailures()
+ throws Exception {
+ List<JsonNode> responses = List.of(
+ responseWithErrorCodeObject(QueryErrorCode.BROKER_INSTANCE_MISSING),
+ responseWithErrorCodeArray(QueryErrorCode.QUERY_SCHEDULING_TIMEOUT),
+ responseWithErrorCodeArray(QueryErrorCode.SERVER_NOT_RESPONDING),
+ partialResultResponse(),
+ totalDocsResponse(12));
+ AtomicInteger attempts = new AtomicInteger();
+
+ new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs -> docs == 12, ()
-> "Failed to load docs", () -> {
+ int attempt = attempts.getAndIncrement();
+ return StreamOp.extractTotalDocs(QUERY, responses.get(attempt));
+ });
+
+ assertEquals(attempts.get(), responses.size());
+ }
+
+ @Test
+ public void testWaitForDocsRetriesThrownQueryException()
+ throws Exception {
+ AtomicInteger attempts = new AtomicInteger();
+
+ new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs -> docs == 12, ()
-> "Failed to load docs", () -> {
+ if (attempts.getAndIncrement() == 0) {
+ throw new StreamOp.RetryableQueryException("Broker unavailable");
+ }
+ return 12;
+ });
+
+ assertEquals(attempts.get(), 2);
+ }
+
+ @Test
+ public void testWaitForDocsFailsFastOnNonRetryableException()
+ throws Exception {
+ AtomicInteger attempts = new AtomicInteger();
+
+ RuntimeException thrown = expectThrows(RuntimeException.class,
+ () -> new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs ->
true, () -> "Failed to load docs", () -> {
+ attempts.incrementAndGet();
+ return StreamOp.extractTotalDocs(QUERY,
responseWithErrorCodeArray(QueryErrorCode.SQL_PARSING));
+ }));
+
+
assertTrue(thrown.getMessage().contains(QueryErrorCode.SQL_PARSING.getDefaultMessage()));
+ assertEquals(attempts.get(), 1);
+ }
+
+ private static JsonNode responseWithErrorCodeArray(QueryErrorCode
queryErrorCode)
+ throws Exception {
+ return JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":" +
queryErrorCode.getId()
+ + ",\"message\":\"" + queryErrorCode.getDefaultMessage() + "\"}]}");
+ }
+
+ private static JsonNode responseWithErrorCodeObject(QueryErrorCode
queryErrorCode)
+ throws Exception {
+ return JsonUtils.stringToJsonNode("{\"exceptions\":{\"errorCode\":" +
queryErrorCode.getId()
+ + ",\"message\":\"" + queryErrorCode.getDefaultMessage() + "\"}}");
+ }
+
+ private static JsonNode partialResultResponse()
+ throws Exception {
+ return
JsonUtils.stringToJsonNode("{\"numServersQueried\":2,\"numServersResponded\":1,\"totalDocs\":10}");
+ }
+
+ private static JsonNode totalDocsResponse(long totalDocs)
+ throws Exception {
+ return
JsonUtils.stringToJsonNode("{\"numServersQueried\":1,\"numServersResponded\":1,\"totalDocs\":"
+ totalDocs
+ + "}");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]