This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 48df78e476 Provide results in CompletableFuture for java clients and 
expose metrics (#10326)
48df78e476 is described below

commit 48df78e4764e1b004ebb19b403463ac0c666d612
Author: cgenrich <[email protected]>
AuthorDate: Mon Jun 26 19:05:50 2023 -0700

    Provide results in CompletableFuture for java clients and expose metrics 
(#10326)
---
 .../java/org/apache/pinot/client/Connection.java   | 72 +++++-----------
 .../client/JsonAsyncHttpPinotClientTransport.java  | 99 +++++++---------------
 .../apache/pinot/client/PinotClientTransport.java  | 18 +++-
 .../org/apache/pinot/client/PreparedStatement.java |  4 +-
 .../apache/pinot/client/ConnectionFactoryTest.java | 13 +++
 .../apache/pinot/client/PreparedStatementTest.java | 18 ++--
 .../apache/pinot/client/ResultSetGroupTest.java    |  6 +-
 .../pinot/client/DummyPinotClientTransport.java    |  6 +-
 .../apache/pinot/client/PinotResultSetTest.java    |  6 +-
 9 files changed, 96 insertions(+), 146 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index 6c15b674f8..49c12f2c8b 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -20,10 +20,7 @@ package org.apache.pinot.client;
 
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -39,24 +36,24 @@ public class Connection {
   public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Connection.class);
 
-  private final PinotClientTransport _transport;
+  private final PinotClientTransport<?> _transport;
   private final BrokerSelector _brokerSelector;
   private final boolean _failOnExceptions;
 
-  Connection(List<String> brokerList, PinotClientTransport transport) {
+  Connection(List<String> brokerList, PinotClientTransport<?> transport) {
     this(new Properties(), new SimpleBrokerSelector(brokerList), transport);
   }
 
-  Connection(Properties properties, List<String> brokerList, 
PinotClientTransport transport) {
+  Connection(Properties properties, List<String> brokerList, 
PinotClientTransport<?> transport) {
     this(properties, new SimpleBrokerSelector(brokerList), transport);
     LOGGER.info("Created connection to broker list {}", brokerList);
   }
 
-  Connection(BrokerSelector brokerSelector, PinotClientTransport transport) {
+  Connection(BrokerSelector brokerSelector, PinotClientTransport<?> transport) 
{
     this(new Properties(), brokerSelector, transport);
   }
 
-  Connection(Properties properties, BrokerSelector brokerSelector, 
PinotClientTransport transport) {
+  Connection(Properties properties, BrokerSelector brokerSelector, 
PinotClientTransport<?> transport) {
     _brokerSelector = brokerSelector;
     _transport = transport;
 
@@ -150,7 +147,7 @@ public class Connection {
    * @return A future containing the result of the query
    * @throws PinotClientException If an exception occurs while processing the 
query
    */
-  public Future<ResultSetGroup> executeAsync(String query)
+  public CompletableFuture<ResultSetGroup> executeAsync(String query)
       throws PinotClientException {
     return executeAsync(null, query);
   }
@@ -163,7 +160,7 @@ public class Connection {
    * @throws PinotClientException If an exception occurs while processing the 
query
    */
   @Deprecated
-  public Future<ResultSetGroup> executeAsync(Request request)
+  public CompletableFuture<ResultSetGroup> executeAsync(Request request)
       throws PinotClientException {
     return executeAsync(null, request.getQuery());
   }
@@ -175,11 +172,14 @@ public class Connection {
    * @return A future containing the result of the query
    * @throws PinotClientException If an exception occurs while processing the 
query
    */
-  public Future<ResultSetGroup> executeAsync(@Nullable String tableName, 
String query)
+  public CompletableFuture<ResultSetGroup> executeAsync(@Nullable String 
tableName, String query)
       throws PinotClientException {
     String[] tableNames = (tableName == null) ? resolveTableName(query) : new 
String[]{tableName};
     String brokerHostPort = _brokerSelector.selectBroker(tableNames);
-    return new 
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
+    if (brokerHostPort == null) {
+      throw new PinotClientException("Could not find broker to query for 
statement: " + query);
+    }
+    return _transport.executeQueryAsync(brokerHostPort, 
query).thenApply(ResultSetGroup::new);
   }
 
   /**
@@ -220,43 +220,13 @@ public class Connection {
     _brokerSelector.close();
   }
 
-  private static class ResultSetGroupFuture implements Future<ResultSetGroup> {
-    private final Future<BrokerResponse> _responseFuture;
-
-    public ResultSetGroupFuture(Future<BrokerResponse> responseFuture) {
-      _responseFuture = responseFuture;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return _responseFuture.cancel(mayInterruptIfRunning);
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return _responseFuture.isCancelled();
-    }
-
-    @Override
-    public boolean isDone() {
-      return _responseFuture.isDone();
-    }
-
-    @Override
-    public ResultSetGroup get()
-        throws InterruptedException, ExecutionException {
-      try {
-        return get(60000L, TimeUnit.MILLISECONDS);
-      } catch (TimeoutException e) {
-        throw new ExecutionException(e);
-      }
-    }
-
-    @Override
-    public ResultSetGroup get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      BrokerResponse response = _responseFuture.get(timeout, unit);
-      return new ResultSetGroup(response);
-    }
+  /**
+   * Provides access to the underlying transport mechanism for this connection.
+   * There may be client metrics useful for monitoring and other observability 
goals.
+   *
+   * @return pinot client transport.
+   */
+  public PinotClientTransport<?> getTransport() {
+    return _transport;
   }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index cd0e49139a..c87ae3fd80 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.client;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -28,8 +29,8 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
@@ -39,9 +40,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
 import org.asynchttpclient.Dsl;
-import org.asynchttpclient.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
 /**
  * JSON encoded Pinot client transport over AsyncHttpClient.
  */
-public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport 
{
+public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<ClientStats> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
   private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
   private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING = 
"groupByMode=sql;responseFormat=sql";
@@ -66,7 +67,7 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
     _headers = new HashMap<>();
     _scheme = CommonConstants.HTTP_PROTOCOL;
     _extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
-    _httpClient = Dsl.asyncHttpClient();
+    _httpClient = 
Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
   }
 
   public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String 
scheme, String extraOptionString,
@@ -82,7 +83,8 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
       builder.setSslContext(new JdkSslContext(sslContext, true, 
ClientAuth.OPTIONAL));
     }
 
-    builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
+    builder.setRequestTimeout(_brokerReadTimeout)
+        .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
         .setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
         .setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
         .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
appId))
@@ -103,7 +105,8 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
       builder.setSslContext(sslContext);
     }
 
-    builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
+    builder.setRequestTimeout(_brokerReadTimeout)
+        .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
         .setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
         .setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
         .setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", 
appId))
@@ -122,7 +125,7 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
   }
 
   @Override
-  public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String 
query) {
+  public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, String query) {
     try {
       ObjectNode json = JsonNodeFactory.instance.objectNode();
       json.put("sql", query);
@@ -134,11 +137,23 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
       if (_headers != null) {
         _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
       }
+      LOGGER.debug("Sending query {} to {}", query, url);
+      return requestBuilder.addHeader("Content-Type", "application/json; 
charset=utf-8").setBody(json.toString())
+              .execute().toCompletableFuture().thenApply(httpResponse -> {
+        LOGGER.debug("Completed query, HTTP status is {}", 
httpResponse.getStatusCode());
+
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+                  "Pinot returned HTTP status " + httpResponse.getStatusCode() 
+ ", expected 200");
+        }
 
-      Future<Response> response =
-          requestBuilder.addHeader("Content-Type", "application/json; 
charset=utf-8").setBody(json.toString())
-              .execute();
-      return new BrokerResponseFuture(response, query, url, 
_brokerReadTimeout);
+        String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
+        try {
+          return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
+        } catch (JsonProcessingException e) {
+          throw new CompletionException(e);
+        }
+      });
     } catch (Exception e) {
       throw new PinotClientException(e);
     }
@@ -155,7 +170,7 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
   }
 
   @Override
-  public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
+  public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, Request request)
       throws PinotClientException {
     return executeQueryAsync(brokerAddress, request.getQuery());
   }
@@ -173,60 +188,8 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport {
     }
   }
 
-  private static class BrokerResponseFuture implements Future<BrokerResponse> {
-    private final Future<Response> _response;
-    private final String _query;
-    private final String _url;
-    private final long _brokerReadTimeout;
-
-    public BrokerResponseFuture(Future<Response> response, String query, 
String url, long brokerReadTimeout) {
-      _response = response;
-      _query = query;
-      _url = url;
-      _brokerReadTimeout = brokerReadTimeout;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return _response.cancel(mayInterruptIfRunning);
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return _response.isCancelled();
-    }
-
-    @Override
-    public boolean isDone() {
-      return _response.isDone();
-    }
-
-    @Override
-    public BrokerResponse get()
-        throws ExecutionException {
-      return get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public BrokerResponse get(long timeout, TimeUnit unit)
-        throws ExecutionException {
-      try {
-        LOGGER.debug("Sending query {} to {}", _query, _url);
-
-        Response httpResponse = _response.get(timeout, unit);
-
-        LOGGER.debug("Completed query, HTTP status is {}", 
httpResponse.getStatusCode());
-
-        if (httpResponse.getStatusCode() != 200) {
-          throw new PinotClientException(
-              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
-        }
-
-        String responseBody = 
httpResponse.getResponseBody(StandardCharsets.UTF_8);
-        return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
-      } catch (Exception e) {
-        throw new ExecutionException(e);
-      }
-    }
+  @Override
+  public ClientStats getClientMetrics() {
+    return _httpClient.getClientStats();
   }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
index fbd398906a..9e4d2a6656 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
@@ -18,18 +18,18 @@
  */
 package org.apache.pinot.client;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 
 /**
  * Interface for plugging different client transports.
  */
-public interface PinotClientTransport {
+public interface PinotClientTransport<METRICS> {
 
   BrokerResponse executeQuery(String brokerAddress, String query)
       throws PinotClientException;
 
-  Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query)
+  CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, 
String query)
       throws PinotClientException;
 
   @Deprecated
@@ -37,9 +37,19 @@ public interface PinotClientTransport {
       throws PinotClientException;
 
   @Deprecated
-  Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request 
request)
+  CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
       throws PinotClientException;
 
   void close()
       throws PinotClientException;
+
+  /**
+   * Access to the client metrics implementation if any.
+   * This may be useful for observability into the client implementation.
+   *
+   * @return underlying client metrics if any
+   */
+  default METRICS getClientMetrics() {
+    throw new UnsupportedOperationException("No useful client metrics 
available");
+  }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
index 314c94989b..600627f5fc 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.client;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 
 /**
@@ -72,7 +72,7 @@ public class PreparedStatement {
    *
    * @return The query results
    */
-  public Future<ResultSetGroup> executeAsync() {
+  public CompletableFuture<ResultSetGroup> executeAsync() {
     return _connection.executeAsync(fillStatementWithParameters());
   }
 
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 037e8bc9af..5a755afc55 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -101,6 +101,19 @@ public class ConnectionFactoryTest {
     Assert.assertEquals(connection.getBrokerList(), brokers);
   }
 
+  @Test
+  public void testConnectionTransport() {
+    // Create properties
+    Properties properties = new Properties();
+    properties.setProperty("brokerList", "127.0.0.1:1234,localhost:2345");
+
+    // Create the connection
+    Connection connection = ConnectionFactory.fromProperties(properties);
+
+    Assert.assertNotNull(connection.getTransport());
+    Assert.assertNotNull(connection.getTransport().getClientMetrics());
+  }
+
   // For testing DynamicBrokerSelector
 
   /**
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
index f2ba5f5843..0da2a2f260 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.client;
 
 import java.util.Collections;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -79,27 +79,21 @@ public class PreparedStatementTest {
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
String query)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, String query)
         throws PinotClientException {
-      _lastBrokerAddress = brokerAddress;
-      _lastQuery = query;
-      return null;
+      return CompletableFuture.completedFuture(executeQuery(brokerAddress, 
query));
     }
 
     @Override
     public BrokerResponse executeQuery(String brokerAddress, Request request)
         throws PinotClientException {
-      _lastBrokerAddress = brokerAddress;
-      _lastQuery = request.getQuery();
-      return BrokerResponse.empty();
+      return executeQuery(brokerAddress, request.getQuery());
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, Request request)
         throws PinotClientException {
-      _lastBrokerAddress = brokerAddress;
-      _lastQuery = request.getQuery();
-      return null;
+      return executeQueryAsync(brokerAddress, request.getQuery());
     }
 
     public String getLastQuery() {
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
index c513587696..91590a413c 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.client;
 import java.io.InputStream;
 import java.util.Collections;
 import java.util.Properties;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -165,7 +165,7 @@ public class ResultSetGroupTest {
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
String query)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, String query)
         throws PinotClientException {
       return null;
     }
@@ -177,7 +177,7 @@ public class ResultSetGroupTest {
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, Request request)
         throws PinotClientException {
       return null;
     }
diff --git 
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
 
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
index 86ad814ffa..40a0dbd768 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.client;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 
 public class DummyPinotClientTransport implements PinotClientTransport {
@@ -32,7 +32,7 @@ public class DummyPinotClientTransport implements 
PinotClientTransport {
   }
 
   @Override
-  public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String 
query)
+  public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, String query)
       throws PinotClientException {
     _lastQuery = query;
     return null;
@@ -46,7 +46,7 @@ public class DummyPinotClientTransport implements 
PinotClientTransport {
   }
 
   @Override
-  public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
+  public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, Request request)
       throws PinotClientException {
     _lastQuery = request.getQuery();
     return null;
diff --git 
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
 
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
index 7e92cc1040..abcddfbe9e 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
@@ -23,7 +23,7 @@ import java.sql.ResultSetMetaData;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.client.utils.DateTimeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -189,7 +189,7 @@ public class PinotResultSetTest {
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
String query)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, String query)
         throws PinotClientException {
       return null;
     }
@@ -201,7 +201,7 @@ public class PinotResultSetTest {
     }
 
     @Override
-    public Future<BrokerResponse> executeQueryAsync(String brokerAddress, 
Request request)
+    public CompletableFuture<BrokerResponse> executeQueryAsync(String 
brokerAddress, Request request)
         throws PinotClientException {
       return null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to