Copilot commented on code in PR #16782:
URL: https://github.com/apache/pinot/pull/16782#discussion_r2337678255


##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java:
##########
@@ -205,4 +204,281 @@ public void close()
   public PinotClientTransport<?> getTransport() {
     return _transport;
   }
+
+  // ========== Cursor Methods (Default implementations for backward 
compatibility) ==========
+
+  /**
+   * Opens a cursor for paginated query execution using the Cursor Handle 
Pattern.
+   * The returned cursor starts with the first page already loaded.
+   *
+   * @param query the query to execute
+   * @param pageSize the number of rows per page
+   * @return ResultCursor with the first page loaded and ready for navigation
+   * @throws PinotClientException If an exception occurs while processing the 
query
+   */
+  public ResultCursor openCursor(String query, int pageSize) throws 
PinotClientException {
+    // Select broker for cursor operation
+    String brokerHostPort = _brokerSelector.selectBroker((String) null);
+    if (brokerHostPort == null) {
+      throw new PinotClientException("Could not find broker to execute cursor 
query");
+    }
+
+    try {
+      CursorAwareBrokerResponse initialResponse = 
_transport.executeQueryWithCursor(brokerHostPort, query, pageSize);
+      if (initialResponse.hasExceptions() && _failOnExceptions) {
+        throw new PinotClientException("Query had processing exceptions: \n" + 
initialResponse.getExceptions());
+      }
+
+      return new ResultCursorImpl(_transport, brokerHostPort, initialResponse, 
_failOnExceptions);
+    } catch (UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Cursor operations not supported 
by this connection type", e);
+    } catch (Exception e) {
+      throw new PinotClientException("Failed to open cursor", e);
+    }
+  }
+
+  /**
+   * Executes a query with cursor support for pagination.
+   *
+   * @deprecated Use openCursor(String, int) instead for better resource 
management
+   * @param query the query to execute
+   * @param numRows the number of rows per page
+   * @return CursorResultSetGroup containing the first page and cursor metadata
+   * @throws PinotClientException If an exception occurs while processing the 
query
+   */
+  @Deprecated
+  public CursorResultSetGroup executeCursorQuery(String query, int numRows) 
throws PinotClientException {
+    // Select broker for the query
+    String[] tableNames = resolveTableName(query);
+    String brokerHostPort = _brokerSelector.selectBroker(tableNames);
+    if (brokerHostPort == null) {
+      throw new PinotClientException("Could not find broker to execute cursor 
query");
+    }
+
+    try {
+      // Execute query with cursor support
+      CursorAwareBrokerResponse response = 
_transport.executeQueryWithCursor(brokerHostPort, query, numRows);
+      if (response.hasExceptions() && _failOnExceptions) {
+        throw new PinotClientException("Query had processing exceptions: \n" + 
response.getExceptions());
+      }
+
+      // Create cursor result set group
+      return new CursorResultSetGroup(response);
+    } catch (UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Cursor operations not supported 
by this connection type", e);
+    } catch (Exception e) {
+      throw new PinotClientException("Failed to execute cursor query", e);
+    }
+  }
+
+  /**
+   * Executes a query with cursor support asynchronously.
+   *
+   * @param query The SQL query to execute
+   * @param pageSize The number of rows per page
+   * @return CompletableFuture containing CursorResultSet
+   */
+  public CompletableFuture<CursorResultSetGroup> 
executeCursorQueryAsync(String query, int pageSize) {

Review Comment:
   Unsafe cast without type checking. This will throw ClassCastException if 
_transport is not a JsonAsyncHttpPinotClientTransport. Add instanceof check 
before casting.
   ```suggestion
     public CompletableFuture<CursorResultSetGroup> 
executeCursorQueryAsync(String query, int pageSize) {
       if (!(_transport instanceof JsonAsyncHttpPinotClientTransport)) {
         return CompletableFuture.failedFuture(
             new UnsupportedOperationException("Cursor operations not supported 
by this connection type"));
       }
   ```



##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Extended BrokerResponse with cursor-specific fields for cursor pagination 
queries.
+ * This class adds cursor metadata fields while maintaining full compatibility 
with BrokerResponse.
+ */
+public class CursorAwareBrokerResponse extends BrokerResponse {
+  // Cursor-specific fields from Pinot documentation
+  private final Long _offset;
+  private final Integer _numRows;
+  private final Long _numRowsResultSet;
+  private Long _cursorResultWriteTimeMs;
+  private Long _submissionTimeMs;
+  private Long _expirationTimeMs;
+  private final String _brokerHost;
+  private Integer _brokerPort;
+  private Long _bytesWritten;
+  private final Long _cursorFetchTimeMs;
+
+  /**
+   * Creates a CursorAwareBrokerResponse by parsing cursor-specific fields 
from the JSON response.
+   */
+  public static CursorAwareBrokerResponse fromJson(JsonNode brokerResponse) {
+    return new CursorAwareBrokerResponse(brokerResponse);
+  }
+
+
+  private CursorAwareBrokerResponse(JsonNode brokerResponse) {
+    super(brokerResponse); // Initialize base BrokerResponse fields
+
+    // Parse cursor-specific fields
+    _offset = brokerResponse.has("offset") ? 
brokerResponse.get("offset").asLong() : null;
+    _numRows = brokerResponse.has("numRows") ? 
brokerResponse.get("numRows").asInt() : null;
+    _numRowsResultSet = brokerResponse.has("numRowsResultSet") ? 
brokerResponse.get("numRowsResultSet").asLong() : null;
+    JsonNode cursorResultWriteTimeMsNode = 
brokerResponse.get("cursorResultWriteTimeMs");
+    _cursorResultWriteTimeMs = cursorResultWriteTimeMsNode != null ? 
cursorResultWriteTimeMsNode.asLong() : null;
+    JsonNode expirationTimeMsNode = brokerResponse.get("expirationTimeMs");
+    _expirationTimeMs = expirationTimeMsNode != null ? 
expirationTimeMsNode.asLong() : null;
+    _submissionTimeMs = brokerResponse.has("submissionTimeMs") ? 
brokerResponse.get("submissionTimeMs").asLong() : null;
+    _brokerHost = brokerResponse.has("brokerHost") ? 
brokerResponse.get("brokerHost").asText() : null;
+    _brokerPort = brokerResponse.has("brokerPort") ? 
brokerResponse.get("brokerPort").asInt() : null;
+    _bytesWritten = brokerResponse.has("bytesWritten") ? 
brokerResponse.get("bytesWritten").asLong() : null;
+    _cursorFetchTimeMs = brokerResponse.has("cursorFetchTimeMs")
+        ? brokerResponse.get("cursorFetchTimeMs").asLong() : null;
+  }
+
+
+

Review Comment:
   [nitpick] The null-checking pattern for JsonNode is repeated. Consider 
extracting this into a helper method to reduce duplication and improve 
maintainability.
   ```suggestion
       // Parse cursor-specific fields using helper methods
       _offset = getLongOrNull(brokerResponse, "offset");
       _numRows = getIntOrNull(brokerResponse, "numRows");
       _numRowsResultSet = getLongOrNull(brokerResponse, "numRowsResultSet");
       _cursorResultWriteTimeMs = getLongOrNull(brokerResponse, 
"cursorResultWriteTimeMs");
       _expirationTimeMs = getLongOrNull(brokerResponse, "expirationTimeMs");
       _submissionTimeMs = getLongOrNull(brokerResponse, "submissionTimeMs");
       _brokerHost = getTextOrNull(brokerResponse, "brokerHost");
       _brokerPort = getIntOrNull(brokerResponse, "brokerPort");
       _bytesWritten = getLongOrNull(brokerResponse, "bytesWritten");
       _cursorFetchTimeMs = getLongOrNull(brokerResponse, "cursorFetchTimeMs");
     }
   
     // Helper methods for extracting values from JsonNode with null checks
     private static Long getLongOrNull(JsonNode node, String fieldName) {
       JsonNode valueNode = node.get(fieldName);
       return (valueNode != null && !valueNode.isNull()) ? valueNode.asLong() : 
null;
     }
   
     private static Integer getIntOrNull(JsonNode node, String fieldName) {
       JsonNode valueNode = node.get(fieldName);
       return (valueNode != null && !valueNode.isNull()) ? valueNode.asInt() : 
null;
     }
   
     private static String getTextOrNull(JsonNode node, String fieldName) {
       JsonNode valueNode = node.get(fieldName);
       return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() : 
null;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to