Jackie-Jiang commented on code in PR #18532:
URL: https://github.com/apache/pinot/pull/18532#discussion_r3268636938


##########
pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java:
##########
@@ -301,15 +321,105 @@ private long fetchExistingTotalDocs(String tableName)
     return response.get(TOTAL_DOCS).asLong();
   }
 
-  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs) {
+  private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+      throws Exception {
+    AtomicLong loadedDocs = new AtomicLong(-1);
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return true;
+    }, () -> "Failed to fetch existing documents for table: " + tableName);
+    return loadedDocs.get();
+  }
+
+  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs)
+      throws Exception {
     LOGGER.info("Wait Doc to load ...");
     AtomicLong loadedDocs = new AtomicLong(-1);
-    TestUtils.waitForCondition(
-        () -> {
-          long existingTotalDocs = fetchExistingTotalDocs(tableName);
-          loadedDocs.set(existingTotalDocs);
-          return existingTotalDocs == targetDocs;
-        }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents. 
Found " + loadedDocs.get() + " instead",
-        Duration.ofSeconds(1));
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return existingTotalDocs == targetDocs;
+    }, () -> "Failed to load " + targetDocs + " documents. Found " + 
loadedDocs.get() + " instead");
+  }
+
+  private void waitForDocs(String tableName, long timeoutMs, LongPredicate 
condition, Supplier<String> errorMessage)
+      throws Exception {
+    waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS, 
condition, errorMessage,
+        () -> fetchExistingTotalDocs(tableName));
+  }
+
+  void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs, 
LongPredicate condition,
+      Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+      throws Exception {
+    long endTimeMs = System.currentTimeMillis() + timeoutMs;
+    long nextLogTimeMs = 0;
+    RetryableQueryException lastRetryableQueryException = null;
+    while (System.currentTimeMillis() < endTimeMs) {
+      try {
+        if (condition.test(totalDocsSupplier.getAsLong())) {
+          return;
+        }
+      } catch (RetryableQueryException e) {
+        lastRetryableQueryException = e;
+        long currentTimeMs = System.currentTimeMillis();
+        if (currentTimeMs >= nextLogTimeMs) {
+          LOGGER.warn("Unable to fetch total docs for table: {}. Trying 
again", tableName, e);
+          nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+        }
+      }
+      Thread.sleep(retryIntervalMs);
+    }
+    throw new RuntimeException(errorMessage.get(), 
lastRetryableQueryException);
+  }
+
+  static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+    if (exceptions.isArray()) {
+      if (exceptions.isEmpty()) {
+        return false;
+      }
+      for (JsonNode exception : exceptions) {
+        if (!isRetryableQueryException(exception)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return isRetryableQueryException(exceptions);
+  }
+
+  private static boolean isRetryableQueryException(JsonNode exception) {
+    Integer errorCode = getErrorCode(exception);
+    return errorCode != null && 
RETRYABLE_QUERY_ERROR_CODES.contains(errorCode);
+  }
+
+  private static Integer getErrorCode(JsonNode exception) {
+    JsonNode errorCode = exception.get(ERROR_CODE);
+    if (errorCode != null) {
+      if (errorCode.canConvertToInt()) {
+        return errorCode.asInt();
+      }
+      if (errorCode.isTextual()) {
+        try {
+          return Integer.parseInt(errorCode.asText());
+        } catch (NumberFormatException e) {
+          return null;
+        }
+      }
+    }
+    return null;

Review Comment:
   ```suggestion
       return errorCode != null ? errorCode.asInt() : null;
   ```



##########
pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java:
##########
@@ -301,15 +321,105 @@ private long fetchExistingTotalDocs(String tableName)
     return response.get(TOTAL_DOCS).asLong();
   }
 
-  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs) {
+  private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+      throws Exception {
+    AtomicLong loadedDocs = new AtomicLong(-1);
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return true;
+    }, () -> "Failed to fetch existing documents for table: " + tableName);
+    return loadedDocs.get();
+  }
+
+  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs)
+      throws Exception {
     LOGGER.info("Wait Doc to load ...");
     AtomicLong loadedDocs = new AtomicLong(-1);
-    TestUtils.waitForCondition(
-        () -> {
-          long existingTotalDocs = fetchExistingTotalDocs(tableName);
-          loadedDocs.set(existingTotalDocs);
-          return existingTotalDocs == targetDocs;
-        }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents. 
Found " + loadedDocs.get() + " instead",
-        Duration.ofSeconds(1));
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return existingTotalDocs == targetDocs;
+    }, () -> "Failed to load " + targetDocs + " documents. Found " + 
loadedDocs.get() + " instead");
+  }
+
+  private void waitForDocs(String tableName, long timeoutMs, LongPredicate 
condition, Supplier<String> errorMessage)
+      throws Exception {
+    waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS, 
condition, errorMessage,
+        () -> fetchExistingTotalDocs(tableName));
+  }
+
+  void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs, 
LongPredicate condition,
+      Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+      throws Exception {
+    long endTimeMs = System.currentTimeMillis() + timeoutMs;
+    long nextLogTimeMs = 0;
+    RetryableQueryException lastRetryableQueryException = null;
+    while (System.currentTimeMillis() < endTimeMs) {
+      try {
+        if (condition.test(totalDocsSupplier.getAsLong())) {
+          return;
+        }
+      } catch (RetryableQueryException e) {
+        lastRetryableQueryException = e;
+        long currentTimeMs = System.currentTimeMillis();
+        if (currentTimeMs >= nextLogTimeMs) {
+          LOGGER.warn("Unable to fetch total docs for table: {}. Trying 
again", tableName, e);
+          nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+        }
+      }
+      Thread.sleep(retryIntervalMs);
+    }
+    throw new RuntimeException(errorMessage.get(), 
lastRetryableQueryException);
+  }
+
+  static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+    if (exceptions.isArray()) {
+      if (exceptions.isEmpty()) {
+        return false;

Review Comment:
   Why returning `false` here?



##########
pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java:
##########
@@ -301,15 +321,105 @@ private long fetchExistingTotalDocs(String tableName)
     return response.get(TOTAL_DOCS).asLong();
   }
 
-  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs) {
+  private long waitForExistingTotalDocs(String tableName, long timeoutMs)
+      throws Exception {
+    AtomicLong loadedDocs = new AtomicLong(-1);
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return true;
+    }, () -> "Failed to fetch existing documents for table: " + tableName);
+    return loadedDocs.get();
+  }
+
+  private void waitForDocsLoaded(String tableName, long targetDocs, long 
timeoutMs)
+      throws Exception {
     LOGGER.info("Wait Doc to load ...");
     AtomicLong loadedDocs = new AtomicLong(-1);
-    TestUtils.waitForCondition(
-        () -> {
-          long existingTotalDocs = fetchExistingTotalDocs(tableName);
-          loadedDocs.set(existingTotalDocs);
-          return existingTotalDocs == targetDocs;
-        }, 100L, timeoutMs, "Failed to load " + targetDocs + " documents. 
Found " + loadedDocs.get() + " instead",
-        Duration.ofSeconds(1));
+    waitForDocs(tableName, timeoutMs, existingTotalDocs -> {
+      loadedDocs.set(existingTotalDocs);
+      return existingTotalDocs == targetDocs;
+    }, () -> "Failed to load " + targetDocs + " documents. Found " + 
loadedDocs.get() + " instead");
+  }
+
+  private void waitForDocs(String tableName, long timeoutMs, LongPredicate 
condition, Supplier<String> errorMessage)
+      throws Exception {
+    waitForDocs(tableName, timeoutMs, COUNT_QUERY_RETRY_INTERVAL_MS, 
condition, errorMessage,
+        () -> fetchExistingTotalDocs(tableName));
+  }
+
+  void waitForDocs(String tableName, long timeoutMs, long retryIntervalMs, 
LongPredicate condition,
+      Supplier<String> errorMessage, TotalDocsSupplier totalDocsSupplier)
+      throws Exception {
+    long endTimeMs = System.currentTimeMillis() + timeoutMs;
+    long nextLogTimeMs = 0;
+    RetryableQueryException lastRetryableQueryException = null;
+    while (System.currentTimeMillis() < endTimeMs) {
+      try {
+        if (condition.test(totalDocsSupplier.getAsLong())) {
+          return;
+        }
+      } catch (RetryableQueryException e) {
+        lastRetryableQueryException = e;
+        long currentTimeMs = System.currentTimeMillis();
+        if (currentTimeMs >= nextLogTimeMs) {
+          LOGGER.warn("Unable to fetch total docs for table: {}. Trying 
again", tableName, e);
+          nextLogTimeMs = currentTimeMs + Duration.ofSeconds(1).toMillis();
+        }
+      }
+      Thread.sleep(retryIntervalMs);
+    }
+    throw new RuntimeException(errorMessage.get(), 
lastRetryableQueryException);
+  }
+
+  static boolean hasOnlyRetryableQueryExceptions(JsonNode exceptions) {
+    if (exceptions.isArray()) {

Review Comment:
   Several checks are unnecessarily defensive



##########
pinot-compatibility-verifier/src/main/java/org/apache/pinot/compat/StreamOp.java:
##########
@@ -266,21 +281,27 @@ private boolean produceData() {
   private long fetchExistingTotalDocs(String tableName)
       throws Exception {
     String query = "SELECT count(*) FROM " + tableName;
-    JsonNode response = Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+    JsonNode response;
+    try {
+      response = Utils.postSqlQuery(query, 
ClusterDescriptor.getInstance().getBrokerUrl());
+    } catch (Exception e) {
+      throw new RetryableQueryException(String.format("Failed to query Table: 
%s", tableName), e);

Review Comment:
   Why always treating it as retryable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to