Copilot commented on code in PR #16782:
URL: https://github.com/apache/pinot/pull/16782#discussion_r2337678255
##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java:
##########
@@ -205,4 +204,281 @@ public void close()
public PinotClientTransport<?> getTransport() {
return _transport;
}
+
+ // ========== Cursor Methods (Default implementations for backward
compatibility) ==========
+
+ /**
+ * Opens a cursor for paginated query execution using the Cursor Handle
Pattern.
+ * The returned cursor starts with the first page already loaded.
+ *
+ * @param query the query to execute
+ * @param pageSize the number of rows per page
+ * @return ResultCursor with the first page loaded and ready for navigation
+ * @throws PinotClientException If an exception occurs while processing the
query
+ */
+ public ResultCursor openCursor(String query, int pageSize) throws
PinotClientException {
+ // Select broker for cursor operation
+ String brokerHostPort = _brokerSelector.selectBroker((String) null);
+ if (brokerHostPort == null) {
+ throw new PinotClientException("Could not find broker to execute cursor
query");
+ }
+
+ try {
+ CursorAwareBrokerResponse initialResponse =
_transport.executeQueryWithCursor(brokerHostPort, query, pageSize);
+ if (initialResponse.hasExceptions() && _failOnExceptions) {
+ throw new PinotClientException("Query had processing exceptions: \n" +
initialResponse.getExceptions());
+ }
+
+ return new ResultCursorImpl(_transport, brokerHostPort, initialResponse,
_failOnExceptions);
+ } catch (UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this connection type", e);
+ } catch (Exception e) {
+ throw new PinotClientException("Failed to open cursor", e);
+ }
+ }
+
+ /**
+ * Executes a query with cursor support for pagination.
+ *
+ * @deprecated Use openCursor(String, int) instead for better resource
management
+ * @param query the query to execute
+ * @param numRows the number of rows per page
+ * @return CursorResultSetGroup containing the first page and cursor metadata
+ * @throws PinotClientException If an exception occurs while processing the
query
+ */
+ @Deprecated
+ public CursorResultSetGroup executeCursorQuery(String query, int numRows)
throws PinotClientException {
+ // Select broker for the query
+ String[] tableNames = resolveTableName(query);
+ String brokerHostPort = _brokerSelector.selectBroker(tableNames);
+ if (brokerHostPort == null) {
+ throw new PinotClientException("Could not find broker to execute cursor
query");
+ }
+
+ try {
+ // Execute query with cursor support
+ CursorAwareBrokerResponse response =
_transport.executeQueryWithCursor(brokerHostPort, query, numRows);
+ if (response.hasExceptions() && _failOnExceptions) {
+ throw new PinotClientException("Query had processing exceptions: \n" +
response.getExceptions());
+ }
+
+ // Create cursor result set group
+ return new CursorResultSetGroup(response);
+ } catch (UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this connection type", e);
+ } catch (Exception e) {
+ throw new PinotClientException("Failed to execute cursor query", e);
+ }
+ }
+
+ /**
+ * Executes a query with cursor support asynchronously.
+ *
+ * @param query The SQL query to execute
+ * @param pageSize The number of rows per page
+ * @return CompletableFuture containing CursorResultSet
+ */
+ public CompletableFuture<CursorResultSetGroup>
executeCursorQueryAsync(String query, int pageSize) {
Review Comment:
Unsafe cast without type checking. This will throw ClassCastException if
_transport is not a JsonAsyncHttpPinotClientTransport. Add instanceof check
before casting.
```suggestion
public CompletableFuture<CursorResultSetGroup>
executeCursorQueryAsync(String query, int pageSize) {
if (!(_transport instanceof JsonAsyncHttpPinotClientTransport)) {
return CompletableFuture.failedFuture(
new UnsupportedOperationException("Cursor operations not supported
by this connection type"));
}
```
##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Extended BrokerResponse with cursor-specific fields for cursor pagination
queries.
+ * This class adds cursor metadata fields while maintaining full compatibility
with BrokerResponse.
+ */
+public class CursorAwareBrokerResponse extends BrokerResponse {
+ // Cursor-specific fields from Pinot documentation
+ private final Long _offset;
+ private final Integer _numRows;
+ private final Long _numRowsResultSet;
+ private Long _cursorResultWriteTimeMs;
+ private Long _submissionTimeMs;
+ private Long _expirationTimeMs;
+ private final String _brokerHost;
+ private Integer _brokerPort;
+ private Long _bytesWritten;
+ private final Long _cursorFetchTimeMs;
+
+ /**
+ * Creates a CursorAwareBrokerResponse by parsing cursor-specific fields
from the JSON response.
+ */
+ public static CursorAwareBrokerResponse fromJson(JsonNode brokerResponse) {
+ return new CursorAwareBrokerResponse(brokerResponse);
+ }
+
+
+ private CursorAwareBrokerResponse(JsonNode brokerResponse) {
+ super(brokerResponse); // Initialize base BrokerResponse fields
+
+ // Parse cursor-specific fields
+ _offset = brokerResponse.has("offset") ?
brokerResponse.get("offset").asLong() : null;
+ _numRows = brokerResponse.has("numRows") ?
brokerResponse.get("numRows").asInt() : null;
+ _numRowsResultSet = brokerResponse.has("numRowsResultSet") ?
brokerResponse.get("numRowsResultSet").asLong() : null;
+ JsonNode cursorResultWriteTimeMsNode =
brokerResponse.get("cursorResultWriteTimeMs");
+ _cursorResultWriteTimeMs = cursorResultWriteTimeMsNode != null ?
cursorResultWriteTimeMsNode.asLong() : null;
+ JsonNode expirationTimeMsNode = brokerResponse.get("expirationTimeMs");
+ _expirationTimeMs = expirationTimeMsNode != null ?
expirationTimeMsNode.asLong() : null;
+ _submissionTimeMs = brokerResponse.has("submissionTimeMs") ?
brokerResponse.get("submissionTimeMs").asLong() : null;
+ _brokerHost = brokerResponse.has("brokerHost") ?
brokerResponse.get("brokerHost").asText() : null;
+ _brokerPort = brokerResponse.has("brokerPort") ?
brokerResponse.get("brokerPort").asInt() : null;
+ _bytesWritten = brokerResponse.has("bytesWritten") ?
brokerResponse.get("bytesWritten").asLong() : null;
+ _cursorFetchTimeMs = brokerResponse.has("cursorFetchTimeMs")
+ ? brokerResponse.get("cursorFetchTimeMs").asLong() : null;
+ }
+
+
+
Review Comment:
[nitpick] The null-checking pattern for JsonNode is repeated. Consider
extracting this into a helper method to reduce duplication and improve
maintainability.
```suggestion
// Parse cursor-specific fields using helper methods
_offset = getLongOrNull(brokerResponse, "offset");
_numRows = getIntOrNull(brokerResponse, "numRows");
_numRowsResultSet = getLongOrNull(brokerResponse, "numRowsResultSet");
_cursorResultWriteTimeMs = getLongOrNull(brokerResponse,
"cursorResultWriteTimeMs");
_expirationTimeMs = getLongOrNull(brokerResponse, "expirationTimeMs");
_submissionTimeMs = getLongOrNull(brokerResponse, "submissionTimeMs");
_brokerHost = getTextOrNull(brokerResponse, "brokerHost");
_brokerPort = getIntOrNull(brokerResponse, "brokerPort");
_bytesWritten = getLongOrNull(brokerResponse, "bytesWritten");
_cursorFetchTimeMs = getLongOrNull(brokerResponse, "cursorFetchTimeMs");
}
// Helper methods for extracting values from JsonNode with null checks
private static Long getLongOrNull(JsonNode node, String fieldName) {
JsonNode valueNode = node.get(fieldName);
return (valueNode != null && !valueNode.isNull()) ? valueNode.asLong() :
null;
}
private static Integer getIntOrNull(JsonNode node, String fieldName) {
JsonNode valueNode = node.get(fieldName);
return (valueNode != null && !valueNode.isNull()) ? valueNode.asInt() :
null;
}
private static String getTextOrNull(JsonNode node, String fieldName) {
JsonNode valueNode = node.get(fieldName);
return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() :
null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]