This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ca0dfc10dae Fix compatibility stream count retries (#18532)
ca0dfc10dae is described below

commit ca0dfc10dae24a5f567209c990ffc94081100630
Author: Xiang Fu <[email protected]>
AuthorDate: Tue May 19 20:56:31 2026 -0700

    Fix compatibility stream count retries (#18532)
    
    The compatibility verifier polls SELECT count(*) while rolling brokers and 
servers between versions. During those transitions Pinot can return transient 
broker/server errors or partial count responses before routing and query 
scheduling stabilize.
    
    Retry only those verifier polling failures, keep non-retryable query errors 
fail-fast, and cover the retry loop with focused StreamOp tests.
---
 .../java/org/apache/pinot/compat/StreamOp.java     | 138 +++++++++++++---
 .../main/java/org/apache/pinot/compat/Utils.java   |   5 +-
 .../java/org/apache/pinot/compat/StreamOpTest.java | 174 +++++++++++++++++++++
 3 files changed, 294 insertions(+), 23 deletions(-)

diff --git 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
index 941abcf6afa..b58987a5310 100644
--- 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
+++ 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,6 +33,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongPredicate;
+import java.util.function.Supplier;
 import org.apache.commons.io.FileUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -52,7 +55,6 @@ import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +94,21 @@ public class StreamOp extends BaseOp {
   private static final String NUM_SERVERS_QUERIED = "numServersQueried";
   private static final String NUM_SERVERS_RESPONEDED = "numServersResponded";
   private static final String TOTAL_DOCS = "totalDocs";
+  private static final long COUNT_QUERY_TIMEOUT_MS = 60_000L;
+  private static final long COUNT_QUERY_RETRY_INTERVAL_MS = 100L;
   private static final short KAFKA_REPLICATION_FACTOR = 1;
+  private static final Set<Integer> RETRYABLE_QUERY_ERROR_CODES = Set.of(
+      QueryErrorCode.BROKER_INSTANCE_MISSING.getId(),
+      QueryErrorCode.BROKER_RESOURCE_MISSING.getId(),
+      QueryErrorCode.BROKER_REQUEST_SEND.getId(),
+      QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE.getId(),
+      QueryErrorCode.BROKER_TIMEOUT.getId(),
+      QueryErrorCode.EXECUTION_TIMEOUT.getId(),
+      QueryErrorCode.QUERY_SCHEDULING_TIMEOUT.getId(),
+      QueryErrorCode.SERVER_SHUTTING_DOWN.getId(),
+      QueryErrorCode.SERVER_NOT_RESPONDING.getId(),
+      QueryErrorCode.SERVER_SEGMENT_MISSING.getId(),
+      QueryErrorCode.SERVER_TABLE_MISSING.getId());
 
   public StreamOp() {
     super(OpType.STREAM_OP);
@@ -195,7 +211,7 @@ public class StreamOp extends BaseOp {
       long existingTotalDoc = 0;
 
       // get original rows
-      existingTotalDoc = fetchExistingTotalDocs(tableName);
+      existingTotalDoc = waitForExistingTotalDocs(tableName, 
COUNT_QUERY_TIMEOUT_MS);
 
       // push csv data to kafka
       Properties publisherProps = new Properties();
@@ -254,7 +270,7 @@ public class StreamOp extends BaseOp {
       }
 
       // verify number of rows increases as expected
-      waitForDocsLoaded(tableName, existingTotalDoc + _numRows, 60_000L);
+      waitForDocsLoaded(tableName, existingTotalDoc + _numRows, 
COUNT_QUERY_TIMEOUT_MS);
       LOGGER.info("Verified {} new rows in table: {}", _numRows, tableName);
       return true;
     } catch (Exception e) {
@@ -266,21 +282,27 @@ public class StreamOp extends BaseOp {
   private long fetchExistingTotalDocs(String tableName)
       throws Exception {
     String query = "SELECT count(*) FROM " + tableName;
-    JsonNode response = Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+    JsonNode response;
+    try {
+      response = Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+    } catch (IOException e) {
+      throw new RetryableQueryException(String.format("Failed to query Table: 
%s", tableName), e);
+    }
+    return extractTotalDocs(query, response);
+  }
+
+  static long extractTotalDocs(String query, JsonNode response)
+      throws RetryableQueryException {
     if (response == null) {
-      String errorMsg = String.format("Failed to query Table: %s", tableName);
-      LOGGER.error(errorMsg);
-      throw new RuntimeException(errorMsg);
+      throw new RetryableQueryException(String.format("Failed when running 
query: %s; got null response", query));
     }
 
     if (response.has(EXCEPTIONS) && !response.get(EXCEPTIONS).isEmpty()) {
       String errorMsg =
           String.format("Failed when running query: '%s'; got 
exceptions:\n%s\n", query, response.toPrettyString());
       JsonNode exceptions = response.get(EXCEPTIONS);
-      JsonNode errorCode = exceptions.get(ERROR_CODE);
-      if (QueryErrorCode.BROKER_INSTANCE_MISSING.getId() == errorCode.asInt()) 
{
-        LOGGER.warn("{}.Trying again", errorMsg);
-        return 0;
+      if (hasOnlyRetryableQueryExceptions(exceptions)) {
+        throw new RetryableQueryException(errorMsg);
       }
       LOGGER.error(errorMsg);
       throw new RuntimeException(errorMsg);
@@ -289,8 +311,7 @@ public class StreamOp extends BaseOp {
     if (response.has(NUM_SERVERS_QUERIED) && 
response.has(NUM_SERVERS_RESPONEDED)
         && response.get(NUM_SERVERS_QUERIED).asInt() > 
response.get(NUM_SERVERS_RESPONEDED).asInt()) {
       String errorMsg = String.format("Failed when running query: %s; the 
response contains partial results", query);
-      LOGGER.error(errorMsg);
-      throw new RuntimeException(errorMsg);
+      throw new RetryableQueryException(errorMsg);
     }
 
     if (!response.has(TOTAL_DOCS)) {
@@ -301,15 +322,90 @@ public class StreamOp extends BaseOp {
     return response.get(TOTAL_DOCS).asLong();
   }
 
-  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs) {
+  private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+      throws Exception {
+    AtomicLong loadedDocs = new AtomicLong(-1);
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return true;
+    }, () -> "Failed to fetch existing documents for table: " + tableName);
+    return loadedDocs.get();
+  }
+
+  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs)
+      throws Exception {
     LOGGER.info("Wait Doc to load ...");
     AtomicLong loadedDocs = new AtomicLong(-1);
-    TestUtils.waitForCondition(
-        () -> {
-          long existingTotalDocs = fetchExistingTotalDocs(tableName);
-          loadedDocs.set(existingTotalDocs);
-          return existingTotalDocs == targetDocs;
-        }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents. 
Found " + loadedDocs.get() + " instead",
-        Duration.ofSeconds(1));
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return existingTotalDocs == targetDocs;
+    }, () -> "Failed to load " + targetDocs + " documents. Found " + 
loadedDocs.get() + " instead");
+  }
+
+  private void waitForDocs(String tableName, long timeoutMs, LongPredicate 
condition, Supplier<String> errorMessage)
+      throws Exception {
+    waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS, 
condition, errorMessage,
+        () -> fetchExistingTotalDocs(tableName));
+  }
+
+  void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs, 
LongPredicate condition,
+      Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+      throws Exception {
+    long endTimeMs = System.currentTimeMillis() + timeoutMs;
+    long nextLogTimeMs = 0;
+    RetryableQueryException lastRetryableQueryException = null;
+    while (System.currentTimeMillis() < endTimeMs) {
+      try {
+        if (condition.test(totalDocsSupplier.getAsLong())) {
+          return;
+        }
+      } catch (RetryableQueryException e) {
+        lastRetryableQueryException = e;
+        long currentTimeMs = System.currentTimeMillis();
+        if (currentTimeMs >= nextLogTimeMs) {
+          LOGGER.warn("Unable to fetch total docs for table: {}. Trying 
again", tableName, e);
+          nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+        }
+      }
+      Thread.sleep(retryIntervalMs);
+    }
+    throw new RuntimeException(errorMessage.get(), 
lastRetryableQueryException);
+  }
+
+  static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+    if (exceptions.isArray()) {
+      for (JsonNode exception : exceptions) {
+        if (!isRetryableQueryException(exception)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return isRetryableQueryException(exceptions);
+  }
+
+  private static boolean isRetryableQueryException(JsonNode exception) {
+    Integer errorCode = getErrorCode(exception);
+    return errorCode != null && 
RETRYABLE_QUERY_ERROR_CODES.contains(errorCode);
+  }
+
+  private static Integer getErrorCode(JsonNode exception) {
+    JsonNode errorCode = exception.get(ERROR_CODE);
+    return errorCode != null ? errorCode.asInt() : null;
+  }
+
+  interface TotalDocsSupplier {
+    long getAsLong()
+        throws Exception;
+  }
+
+  static class RetryableQueryException extends Exception {
+    RetryableQueryException(String message) {
+      super(message);
+    }
+
+    RetryableQueryException(String message, Throwable cause) {
+      super(message, cause);
+    }
   }
 }
diff --git 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
index cf90c625b1f..969e3cd3820 100644
--- 
a/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
+++ 
b/pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/Utils.java
@@ -51,7 +51,7 @@ public class Utils {
   }
 
   public static JsonNode postSqlQuery(String query, String brokerBaseApiUrl)
-      throws Exception {
+      throws IOException {
     ObjectNode payload = JsonUtils.newObjectNode();
     payload.put("sql", query);
     payload.put("queryOptions", "groupByMode=sql;responseFormat=sql");
@@ -60,7 +60,8 @@ public class Utils {
         ControllerTest.sendPostRequest(brokerBaseApiUrl + "/query/sql", 
payload.toString()));
   }
 
-  public static JsonNode postMultiStageSqlQuery(String query, String 
brokerBaseApiUrl) throws Exception {
+  public static JsonNode postMultiStageSqlQuery(String query, String 
brokerBaseApiUrl)
+      throws IOException {
     ObjectNode payload = JsonUtils.newObjectNode();
     payload.put("sql", query);
     payload.put("queryOptions", "useMultistageEngine=true");
diff --git 
a/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
 
b/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
new file mode 100644
index 00000000000..1855d3f1f3b
--- /dev/null
+++ 
b/pinot-compatibility-verifier/src/test/java/org/apache/pinot/compat/StreamOpTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.compat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests retry classification for broker responses returned while 
compatibility tests roll components.
+public class StreamOpTest {
+  private static final String QUERY = "SELECT count(*) FROM testTable";
+
+  @Test
+  public void testRetryableExceptionArrayWithNumericErrorCode()
+      throws Exception {
+    JsonNode response = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+        + QueryErrorCode.BROKER_INSTANCE_MISSING.getId() + ",\"message\":\"No 
routing table\"}]}");
+
+    
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+  }
+
+  @Test
+  public void testRetryableExceptionArrayWithBrokerSegmentUnavailableCode()
+      throws Exception {
+    JsonNode response = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+        + QueryErrorCode.BROKER_SEGMENT_UNAVAILABLE.getId() + 
",\"message\":\"Segment unavailable\"}]}");
+
+    
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+  }
+
+  @Test
+  public void testRetryableExceptionArrayWithShutdownCode()
+      throws Exception {
+    JsonNode response = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+        + QueryErrorCode.SERVER_SHUTTING_DOWN.getId() + ",\"message\":\"Server 
shutting down\"}]}");
+
+    
assertTrue(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+  }
+
+  @Test
+  public void testNonRetryableExceptionArray()
+      throws Exception {
+    JsonNode response = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+        + QueryErrorCode.SQL_PARSING.getId() + ",\"message\":\"Bad 
query\"}]}");
+
+    
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+  }
+
+  @Test
+  public void testExceptionWithoutErrorCodeIsNotRetryable()
+      throws Exception {
+    JsonNode responseWithArray = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"code\":\""
+        + QueryErrorCode.BROKER_INSTANCE_MISSING.name() + "\",\"message\":\"No 
routing table\"}]}");
+    JsonNode responseWithObject = 
JsonUtils.stringToJsonNode("{\"exceptions\":{\"code\":\""
+        + QueryErrorCode.BROKER_INSTANCE_MISSING.name() + "\",\"message\":\"No 
routing table\"}}");
+
+    
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(responseWithArray.get("exceptions")));
+    
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(responseWithObject.get("exceptions")));
+    RuntimeException thrown = expectThrows(RuntimeException.class,
+        () -> StreamOp.extractTotalDocs(QUERY, responseWithArray));
+    
assertTrue(thrown.getMessage().contains(QueryErrorCode.BROKER_INSTANCE_MISSING.name()));
+  }
+
+  @Test
+  public void testMixedExceptionArrayIsNotRetryable()
+      throws Exception {
+    JsonNode response = 
JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":"
+        + QueryErrorCode.BROKER_INSTANCE_MISSING.getId() + ",\"message\":\""
+        + QueryErrorCode.BROKER_INSTANCE_MISSING.getDefaultMessage() + 
"\"},{\"errorCode\":"
+        + QueryErrorCode.SQL_PARSING.getId() + ",\"message\":\""
+        + QueryErrorCode.SQL_PARSING.getDefaultMessage() + "\"}]}");
+
+    
assertFalse(StreamOp.hasOnlyRetryableQueryExceptions(response.get("exceptions")));
+    RuntimeException thrown = expectThrows(RuntimeException.class, () -> 
StreamOp.extractTotalDocs(QUERY, response));
+    
assertTrue(thrown.getMessage().contains(QueryErrorCode.SQL_PARSING.getDefaultMessage()));
+  }
+
+  @Test
+  public void testWaitForDocsRetriesTransientFailures()
+      throws Exception {
+    List<JsonNode> responses = List.of(
+        responseWithErrorCodeObject(QueryErrorCode.BROKER_INSTANCE_MISSING),
+        responseWithErrorCodeArray(QueryErrorCode.QUERY_SCHEDULING_TIMEOUT),
+        responseWithErrorCodeArray(QueryErrorCode.SERVER_NOT_RESPONDING),
+        partialResultResponse(),
+        totalDocsResponse(12));
+    AtomicInteger attempts = new AtomicInteger();
+
+    new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs -> docs == 12, () 
-> "Failed to load docs", () -> {
+      int attempt = attempts.getAndIncrement();
+      return StreamOp.extractTotalDocs(QUERY, responses.get(attempt));
+    });
+
+    assertEquals(attempts.get(), responses.size());
+  }
+
+  @Test
+  public void testWaitForDocsRetriesThrownQueryException()
+      throws Exception {
+    AtomicInteger attempts = new AtomicInteger();
+
+    new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs -> docs == 12, () 
-> "Failed to load docs", () -> {
+      if (attempts.getAndIncrement() == 0) {
+        throw new StreamOp.RetryableQueryException("Broker unavailable");
+      }
+      return 12;
+    });
+
+    assertEquals(attempts.get(), 2);
+  }
+
+  @Test
+  public void testWaitForDocsFailsFastOnNonRetryableException()
+      throws Exception {
+    AtomicInteger attempts = new AtomicInteger();
+
+    RuntimeException thrown = expectThrows(RuntimeException.class,
+        () -> new StreamOp().waitForDocs("testTable", 1_000L, 0L, docs -> 
true, () -> "Failed to load docs", () -> {
+          attempts.incrementAndGet();
+          return StreamOp.extractTotalDocs(QUERY, 
responseWithErrorCodeArray(QueryErrorCode.SQL_PARSING));
+        }));
+
+    
assertTrue(thrown.getMessage().contains(QueryErrorCode.SQL_PARSING.getDefaultMessage()));
+    assertEquals(attempts.get(), 1);
+  }
+
+  private static JsonNode responseWithErrorCodeArray(QueryErrorCode 
queryErrorCode)
+      throws Exception {
+    return JsonUtils.stringToJsonNode("{\"exceptions\":[{\"errorCode\":" + 
queryErrorCode.getId()
+        + ",\"message\":\"" + queryErrorCode.getDefaultMessage() + "\"}]}");
+  }
+
+  private static JsonNode responseWithErrorCodeObject(QueryErrorCode 
queryErrorCode)
+      throws Exception {
+    return JsonUtils.stringToJsonNode("{\"exceptions\":{\"errorCode\":" + 
queryErrorCode.getId()
+        + ",\"message\":\"" + queryErrorCode.getDefaultMessage() + "\"}}");
+  }
+
+  private static JsonNode partialResultResponse()
+      throws Exception {
+    return 
JsonUtils.stringToJsonNode("{\"numServersQueried\":2,\"numServersResponded\":1,\"totalDocs\":10}");
+  }
+
+  private static JsonNode totalDocsResponse(long totalDocs)
+      throws Exception {
+    return 
JsonUtils.stringToJsonNode("{\"numServersQueried\":1,\"numServersResponded\":1,\"totalDocs\":"
 + totalDocs
+        + "}");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to