This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 48df78e476 Provide results in CompletableFuture for java clients and
expose metrics (#10326)
48df78e476 is described below
commit 48df78e4764e1b004ebb19b403463ac0c666d612
Author: cgenrich <[email protected]>
AuthorDate: Mon Jun 26 19:05:50 2023 -0700
Provide results in CompletableFuture for java clients and expose metrics
(#10326)
---
.../java/org/apache/pinot/client/Connection.java | 72 +++++-----------
.../client/JsonAsyncHttpPinotClientTransport.java | 99 +++++++---------------
.../apache/pinot/client/PinotClientTransport.java | 18 +++-
.../org/apache/pinot/client/PreparedStatement.java | 4 +-
.../apache/pinot/client/ConnectionFactoryTest.java | 13 +++
.../apache/pinot/client/PreparedStatementTest.java | 18 ++--
.../apache/pinot/client/ResultSetGroupTest.java | 6 +-
.../pinot/client/DummyPinotClientTransport.java | 6 +-
.../apache/pinot/client/PinotResultSetTest.java | 6 +-
9 files changed, 96 insertions(+), 146 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index 6c15b674f8..49c12f2c8b 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -20,10 +20,7 @@ package org.apache.pinot.client;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -39,24 +36,24 @@ public class Connection {
public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
private static final Logger LOGGER =
LoggerFactory.getLogger(Connection.class);
- private final PinotClientTransport _transport;
+ private final PinotClientTransport<?> _transport;
private final BrokerSelector _brokerSelector;
private final boolean _failOnExceptions;
- Connection(List<String> brokerList, PinotClientTransport transport) {
+ Connection(List<String> brokerList, PinotClientTransport<?> transport) {
this(new Properties(), new SimpleBrokerSelector(brokerList), transport);
}
- Connection(Properties properties, List<String> brokerList,
PinotClientTransport transport) {
+ Connection(Properties properties, List<String> brokerList,
PinotClientTransport<?> transport) {
this(properties, new SimpleBrokerSelector(brokerList), transport);
LOGGER.info("Created connection to broker list {}", brokerList);
}
- Connection(BrokerSelector brokerSelector, PinotClientTransport transport) {
+ Connection(BrokerSelector brokerSelector, PinotClientTransport<?> transport)
{
this(new Properties(), brokerSelector, transport);
}
- Connection(Properties properties, BrokerSelector brokerSelector,
PinotClientTransport transport) {
+ Connection(Properties properties, BrokerSelector brokerSelector,
PinotClientTransport<?> transport) {
_brokerSelector = brokerSelector;
_transport = transport;
@@ -150,7 +147,7 @@ public class Connection {
* @return A future containing the result of the query
* @throws PinotClientException If an exception occurs while processing the
query
*/
- public Future<ResultSetGroup> executeAsync(String query)
+ public CompletableFuture<ResultSetGroup> executeAsync(String query)
throws PinotClientException {
return executeAsync(null, query);
}
@@ -163,7 +160,7 @@ public class Connection {
* @throws PinotClientException If an exception occurs while processing the
query
*/
@Deprecated
- public Future<ResultSetGroup> executeAsync(Request request)
+ public CompletableFuture<ResultSetGroup> executeAsync(Request request)
throws PinotClientException {
return executeAsync(null, request.getQuery());
}
@@ -175,11 +172,14 @@ public class Connection {
* @return A future containing the result of the query
* @throws PinotClientException If an exception occurs while processing the
query
*/
- public Future<ResultSetGroup> executeAsync(@Nullable String tableName,
String query)
+ public CompletableFuture<ResultSetGroup> executeAsync(@Nullable String
tableName, String query)
throws PinotClientException {
String[] tableNames = (tableName == null) ? resolveTableName(query) : new
String[]{tableName};
String brokerHostPort = _brokerSelector.selectBroker(tableNames);
- return new
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
+ if (brokerHostPort == null) {
+ throw new PinotClientException("Could not find broker to query for
statement: " + query);
+ }
+ return _transport.executeQueryAsync(brokerHostPort,
query).thenApply(ResultSetGroup::new);
}
/**
@@ -220,43 +220,13 @@ public class Connection {
_brokerSelector.close();
}
- private static class ResultSetGroupFuture implements Future<ResultSetGroup> {
- private final Future<BrokerResponse> _responseFuture;
-
- public ResultSetGroupFuture(Future<BrokerResponse> responseFuture) {
- _responseFuture = responseFuture;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return _responseFuture.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return _responseFuture.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return _responseFuture.isDone();
- }
-
- @Override
- public ResultSetGroup get()
- throws InterruptedException, ExecutionException {
- try {
- return get(60000L, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- throw new ExecutionException(e);
- }
- }
-
- @Override
- public ResultSetGroup get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- BrokerResponse response = _responseFuture.get(timeout, unit);
- return new ResultSetGroup(response);
- }
+ /**
+ * Provides access to the underlying transport mechanism for this connection.
+ * There may be client metrics useful for monitoring and other observability
goals.
+ *
+ * @return pinot client transport.
+ */
+ public PinotClientTransport<?> getTransport() {
+ return _transport;
}
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index cd0e49139a..c87ae3fd80 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.client;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -28,8 +29,8 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
@@ -39,9 +40,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.ClientStats;
import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
import org.asynchttpclient.Dsl;
-import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
/**
* JSON encoded Pinot client transport over AsyncHttpClient.
*/
-public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport
{
+public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<ClientStats> {
private static final Logger LOGGER =
LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING =
"groupByMode=sql;responseFormat=sql";
@@ -66,7 +67,7 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
- _httpClient = Dsl.asyncHttpClient();
+ _httpClient =
Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
}
public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String
scheme, String extraOptionString,
@@ -82,7 +83,8 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
builder.setSslContext(new JdkSslContext(sslContext, true,
ClientAuth.OPTIONAL));
}
- builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
+ builder.setRequestTimeout(_brokerReadTimeout)
+ .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua",
appId))
@@ -103,7 +105,8 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
builder.setSslContext(sslContext);
}
- builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
+ builder.setRequestTimeout(_brokerReadTimeout)
+ .setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua",
appId))
@@ -122,7 +125,7 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String
query) {
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, String query) {
try {
ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("sql", query);
@@ -134,11 +137,23 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
if (_headers != null) {
_headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
}
+ LOGGER.debug("Sending query {} to {}", query, url);
+ return requestBuilder.addHeader("Content-Type", "application/json;
charset=utf-8").setBody(json.toString())
+ .execute().toCompletableFuture().thenApply(httpResponse -> {
+ LOGGER.debug("Completed query, HTTP status is {}",
httpResponse.getStatusCode());
+
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode()
+ ", expected 200");
+ }
- Future<Response> response =
- requestBuilder.addHeader("Content-Type", "application/json;
charset=utf-8").setBody(json.toString())
- .execute();
- return new BrokerResponseFuture(response, query, url,
_brokerReadTimeout);
+ String responseBody =
httpResponse.getResponseBody(StandardCharsets.UTF_8);
+ try {
+ return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
+ } catch (JsonProcessingException e) {
+ throw new CompletionException(e);
+ }
+ });
} catch (Exception e) {
throw new PinotClientException(e);
}
@@ -155,7 +170,7 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, Request request)
throws PinotClientException {
return executeQueryAsync(brokerAddress, request.getQuery());
}
@@ -173,60 +188,8 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
}
}
- private static class BrokerResponseFuture implements Future<BrokerResponse> {
- private final Future<Response> _response;
- private final String _query;
- private final String _url;
- private final long _brokerReadTimeout;
-
- public BrokerResponseFuture(Future<Response> response, String query,
String url, long brokerReadTimeout) {
- _response = response;
- _query = query;
- _url = url;
- _brokerReadTimeout = brokerReadTimeout;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return _response.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return _response.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return _response.isDone();
- }
-
- @Override
- public BrokerResponse get()
- throws ExecutionException {
- return get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public BrokerResponse get(long timeout, TimeUnit unit)
- throws ExecutionException {
- try {
- LOGGER.debug("Sending query {} to {}", _query, _url);
-
- Response httpResponse = _response.get(timeout, unit);
-
- LOGGER.debug("Completed query, HTTP status is {}",
httpResponse.getStatusCode());
-
- if (httpResponse.getStatusCode() != 200) {
- throw new PinotClientException(
- "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
- }
-
- String responseBody =
httpResponse.getResponseBody(StandardCharsets.UTF_8);
- return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
- } catch (Exception e) {
- throw new ExecutionException(e);
- }
- }
+ @Override
+ public ClientStats getClientMetrics() {
+ return _httpClient.getClientStats();
}
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
index fbd398906a..9e4d2a6656 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PinotClientTransport.java
@@ -18,18 +18,18 @@
*/
package org.apache.pinot.client;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
* Interface for plugging different client transports.
*/
-public interface PinotClientTransport {
+public interface PinotClientTransport<METRICS> {
BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException;
- Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query)
+ CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress,
String query)
throws PinotClientException;
@Deprecated
@@ -37,9 +37,19 @@ public interface PinotClientTransport {
throws PinotClientException;
@Deprecated
- Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request
request)
+ CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
throws PinotClientException;
void close()
throws PinotClientException;
+
+ /**
+ * Access to the client metrics implementation if any.
+ * This may be useful for observability into the client implementation.
+ *
+ * @return underlying client metrics if any
+ */
+ default METRICS getClientMetrics() {
+ throw new UnsupportedOperationException("No useful client metrics
available");
+ }
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
index 314c94989b..600627f5fc 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/PreparedStatement.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.client;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
@@ -72,7 +72,7 @@ public class PreparedStatement {
*
* @return The query results
*/
- public Future<ResultSetGroup> executeAsync() {
+ public CompletableFuture<ResultSetGroup> executeAsync() {
return _connection.executeAsync(fillStatementWithParameters());
}
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 037e8bc9af..5a755afc55 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -101,6 +101,19 @@ public class ConnectionFactoryTest {
Assert.assertEquals(connection.getBrokerList(), brokers);
}
+ @Test
+ public void testConnectionTransport() {
+ // Create properties
+ Properties properties = new Properties();
+ properties.setProperty("brokerList", "127.0.0.1:1234,localhost:2345");
+
+ // Create the connection
+ Connection connection = ConnectionFactory.fromProperties(properties);
+
+ Assert.assertNotNull(connection.getTransport());
+ Assert.assertNotNull(connection.getTransport().getClientMetrics());
+ }
+
// For testing DynamicBrokerSelector
/**
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
index f2ba5f5843..0da2a2f260 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.client;
import java.util.Collections;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -79,27 +79,21 @@ public class PreparedStatementTest {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
String query)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, String query)
throws PinotClientException {
- _lastBrokerAddress = brokerAddress;
- _lastQuery = query;
- return null;
+ return CompletableFuture.completedFuture(executeQuery(brokerAddress,
query));
}
@Override
public BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException {
- _lastBrokerAddress = brokerAddress;
- _lastQuery = request.getQuery();
- return BrokerResponse.empty();
+ return executeQuery(brokerAddress, request.getQuery());
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, Request request)
throws PinotClientException {
- _lastBrokerAddress = brokerAddress;
- _lastQuery = request.getQuery();
- return null;
+ return executeQueryAsync(brokerAddress, request.getQuery());
}
public String getLastQuery() {
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
index c513587696..91590a413c 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.client;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -165,7 +165,7 @@ public class ResultSetGroupTest {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
String query)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, String query)
throws PinotClientException {
return null;
}
@@ -177,7 +177,7 @@ public class ResultSetGroupTest {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, Request request)
throws PinotClientException {
return null;
}
diff --git
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
index 86ad814ffa..40a0dbd768 100644
---
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
+++
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/DummyPinotClientTransport.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.client;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
public class DummyPinotClientTransport implements PinotClientTransport {
@@ -32,7 +32,7 @@ public class DummyPinotClientTransport implements
PinotClientTransport {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String
query)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, String query)
throws PinotClientException {
_lastQuery = query;
return null;
@@ -46,7 +46,7 @@ public class DummyPinotClientTransport implements
PinotClientTransport {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, Request request)
throws PinotClientException {
_lastQuery = request.getQuery();
return null;
diff --git
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
index 7e92cc1040..abcddfbe9e 100644
---
a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
+++
b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
@@ -23,7 +23,7 @@ import java.sql.ResultSetMetaData;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.client.utils.DateTimeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -189,7 +189,7 @@ public class PinotResultSetTest {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
String query)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, String query)
throws PinotClientException {
return null;
}
@@ -201,7 +201,7 @@ public class PinotResultSetTest {
}
@Override
- public Future<BrokerResponse> executeQueryAsync(String brokerAddress,
Request request)
+ public CompletableFuture<BrokerResponse> executeQueryAsync(String
brokerAddress, Request request)
throws PinotClientException {
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]