This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f88a275 Add streaming query handler (#5717)
f88a275 is described below
commit f88a2759448b8d9eaeeb1b33b894e24df852b9c2
Author: Elon Azoulay <[email protected]>
AuthorDate: Tue Sep 8 18:12:49 2020 -0700
Add streaming query handler (#5717)
Add streaming query handler and a gRPC query server to stream the query
responses
Currently only support streaming selection only queries
Added `GrpcRequestBuilder` and `GrpcQueryClient` to help build the request
and query the server
Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
---
.../pinot/common/exception/QueryException.java | 4 +
.../apache/pinot/common/utils/CommonConstants.java | 39 ++-
.../pinot/common/utils/grpc/GrpcQueryClient.java | 26 +-
.../common/utils/grpc/GrpcRequestBuilder.java | 106 +++++++
.../operator/blocks/IntermediateResultsBlock.java | 11 +-
.../operator/streaming/StreamingResponseUtils.java | 52 ++++
.../StreamingSelectionOnlyCombineOperator.java | 189 ++++++++++++
.../streaming/StreamingSelectionOnlyOperator.java | 111 +++++++
.../apache/pinot/core/plan/CombinePlanNode.java | 14 +-
.../core/plan/StreamingSelectionPlanNode.java | 55 ++++
.../core/plan/maker/InstancePlanMakerImplV2.java | 27 +-
.../apache/pinot/core/plan/maker/PlanMaker.java | 15 +
.../core/query/executor/GrpcQueryExecutor.java | 327 +++++++++++++++++++++
.../core/query/request/ServerQueryRequest.java | 69 ++++-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 38 ++-
.../combine/SelectionCombineOperatorTest.java | 2 +-
.../pinot/core/plan/CombinePlanNodeTest.java | 6 +-
.../MultiNodesOfflineClusterIntegrationTest.java | 14 +
.../tests/OfflineClusterIntegrationTest.java | 98 +++++-
.../org/apache/pinot/server/conf/ServerConf.java | 22 +-
.../pinot/server/starter/ServerInstance.java | 41 ++-
.../server/starter/grpc/PinotQueryService.java | 23 --
.../server/starter/helix/HelixServerStarter.java | 171 +++++------
23 files changed, 1255 insertions(+), 205 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 89dda00..10a1544 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -45,6 +45,7 @@ public class QueryException {
// TODO: Handle these errors in broker
public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
+ public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230;
public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
public static final int BROKER_GATHER_ERROR_CODE = 300;
@@ -76,6 +77,8 @@ public class QueryException {
new ProcessingException(SERVER_SHUTTING_DOWN_ERROR_CODE);
public static final ProcessingException SERVER_OUT_OF_CAPACITY_ERROR =
new ProcessingException(SERVER_OUT_OF_CAPACITY_ERROR_CODE);
+ public static final ProcessingException SERVER_TABLE_MISSING_ERROR =
+ new ProcessingException(SERVER_TABLE_MISSING_ERROR_CODE);
public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR =
new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
@@ -108,6 +111,7 @@ public class QueryException {
QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown");
SERVER_OUT_OF_CAPACITY_ERROR.setMessage("ServerOutOfCapacity");
+ SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index e116849..ac3a796 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -105,6 +105,7 @@ public class CommonConstants {
public static final String INSTANCE_ID_KEY = "instanceId";
public static final String DATA_DIR_KEY = "dataDir";
public static final String ADMIN_PORT_KEY = "adminPort";
+ public static final String GRPC_PORT_KEY = "grpcPort";
}
public static final String SET_INSTANCE_ID_TO_HOSTNAME_KEY =
"pinot.set.instance.id.to.hostname";
@@ -190,7 +191,12 @@ public class CommonConstants {
public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS =
"pinot.server.query.executor.class";
public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS =
"pinot.server.requestHandlerFactory.class";
public static final String CONFIG_OF_NETTY_PORT =
"pinot.server.netty.port";
+ public static final String CONFIG_OF_ENABLE_GRPC_SERVER =
"pinot.server.grpc.enable";
+ public static final boolean DEFAULT_ENABLE_GRPC_SERVER = false;
+ public static final String CONFIG_OF_GRPC_PORT = "pinot.server.grpc.port";
+ public static final int DEFAULT_GRPC_PORT = 8090;
public static final String CONFIG_OF_ADMIN_API_PORT =
"pinot.server.adminapi.port";
+ public static final int DEFAULT_ADMIN_API_PORT = 8097;
public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION =
"pinot.server.instance.segment.format.version";
public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT =
"pinot.server.instance.enable.split.commit";
@@ -211,7 +217,6 @@ public class CommonConstants {
"pinot.server.starter.realtimeConsumptionCatchupWaitMs";
public static final int
DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
- public static final int DEFAULT_ADMIN_API_PORT = 8097;
public static final String DEFAULT_READ_MODE = "mmap";
// Whether to reload consuming segment on scheme update. Will change
default behavior to true when this feature is stabilized
public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = false;
@@ -387,4 +392,36 @@ public class CommonConstants {
@Deprecated
public static final String TABLE_NAME = "segment.table.name";
}
+
+ public static class Query {
+ public static class Request {
+ public static class MetadataKeys {
+ public static final String REQUEST_ID = "requestId";
+ public static final String BROKER_ID = "brokerId";
+ public static final String ENABLE_TRACE = "enableTrace";
+ public static final String ENABLE_STREAMING = "enableStreaming";
+ public static final String PAYLOAD_TYPE = "payloadType";
+ }
+
+ public static class PayloadType {
+ public static final String SQL = "sql";
+ public static final String BROKER_REQUEST = "brokerRequest";
+ }
+ }
+
+ public static class Response {
+ public static class MetadataKeys {
+ public static final String RESPONSE_TYPE = "responseType";
+ }
+
+ public static class ResponseType {
+ // For streaming response, multiple (could be 0 if no data should be
returned, or query encounters exception)
+ // data responses will be returned, followed by one single metadata
response
+ public static final String DATA = "data";
+ public static final String METADATA = "metadata";
+ // For non-streaming response
+ public static final String NON_STREAMING = "nonStreaming";
+ }
+ }
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
similarity index 58%
rename from
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 23cff7ad..68c03b8 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -16,20 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.starter.grpc;
+package org.apache.pinot.common.utils.grpc;
-import io.grpc.stub.StreamObserver;
+import io.grpc.Channel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Iterator;
import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server;
-/**
- * Handler for grpc server requests.
- * As data becomes available server responses will be added to the result
stream.
- * Once the request is complete the client will aggregate the result metadata.
- */
-public class PinotQueryHandler extends
PinotQueryServerGrpc.PinotQueryServerImplBase {
- @Override
- public void submit(Server.ServerRequest request,
StreamObserver<Server.ServerResponse> responseObserver) {
+public class GrpcQueryClient {
+ private final PinotQueryServerGrpc.PinotQueryServerBlockingStub
_blockingStub;
+
+ public GrpcQueryClient(String host, int port) {
+ // Set max message size to 128MB
+ Channel channel =
+ ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(128
* 1024 * 1024).usePlaintext().build();
+ _blockingStub = PinotQueryServerGrpc.newBlockingStub(channel);
+ }
+
+ public Iterator<Server.ServerResponse> submit(Server.ServerRequest request) {
+ return _blockingStub.submit(request);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
new file mode 100644
index 0000000..0d6daeb
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+
+public class GrpcRequestBuilder {
+ private int _requestId;
+ private String _brokerId = "unknown";
+ private boolean _enableTrace;
+ private boolean _enableStreaming;
+ private String _payloadType;
+ private String _sql;
+ private BrokerRequest _brokerRequest;
+ private List<String> _segments;
+
+ public GrpcRequestBuilder setRequestId(int requestId) {
+ _requestId = requestId;
+ return this;
+ }
+
+ public GrpcRequestBuilder setBrokerId(String brokerId) {
+ _brokerId = brokerId;
+ return this;
+ }
+
+ public GrpcRequestBuilder setEnableTrace(boolean enableTrace) {
+ _enableTrace = enableTrace;
+ return this;
+ }
+
+ public GrpcRequestBuilder setEnableStreaming(boolean enableStreaming) {
+ _enableStreaming = enableStreaming;
+ return this;
+ }
+
+ public GrpcRequestBuilder setSql(String sql) {
+ _payloadType = Request.PayloadType.SQL;
+ _sql = sql;
+ return this;
+ }
+
+ public GrpcRequestBuilder setBrokerRequest(BrokerRequest brokerRequest) {
+ _payloadType = Request.PayloadType.BROKER_REQUEST;
+ _brokerRequest = brokerRequest;
+ return this;
+ }
+
+ public GrpcRequestBuilder setSegments(List<String> segments) {
+ _segments = segments;
+ return this;
+ }
+
+ public Server.ServerRequest build() {
+ Preconditions.checkState(_payloadType != null &&
CollectionUtils.isNotEmpty(_segments),
+ "Query and segmentsToQuery must be set");
+
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(Request.MetadataKeys.REQUEST_ID,
Integer.toString(_requestId));
+ metadata.put(Request.MetadataKeys.BROKER_ID, _brokerId);
+ metadata.put(Request.MetadataKeys.ENABLE_TRACE,
Boolean.toString(_enableTrace));
+ metadata.put(Request.MetadataKeys.ENABLE_STREAMING,
Boolean.toString(_enableStreaming));
+ metadata.put(Request.MetadataKeys.PAYLOAD_TYPE, _payloadType);
+
+ if (_payloadType.equals(Request.PayloadType.SQL)) {
+ return
Server.ServerRequest.newBuilder().putAllMetadata(metadata).setSql(_sql).addAllSegments(_segments).build();
+ } else {
+ byte[] payLoad;
+ try {
+ payLoad = new TSerializer(new
TCompactProtocol.Factory()).serialize(_brokerRequest);
+ } catch (TException e) {
+ throw new RuntimeException("Caught exception while serializing broker
request: " + _brokerRequest, e);
+ }
+ return
Server.ServerRequest.newBuilder().putAllMetadata(metadata).setPayload(ByteString.copyFrom(payLoad))
+ .addAllSegments(_segments).build();
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index d0eb2e2..79ac520 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -67,6 +67,9 @@ public class IntermediateResultsBlock implements Block {
private Table _table;
+ public IntermediateResultsBlock() {
+ }
+
/**
* Constructor for selection result.
*/
@@ -270,11 +273,7 @@ public class IntermediateResultsBlock implements Block {
return getAggregationGroupByResultDataTable();
}
- if (_processingExceptions != null && _processingExceptions.size() > 0) {
- return getProcessingExceptionsDataTable();
- }
-
- throw new UnsupportedOperationException("No data inside
IntermediateResultsBlock.");
+ return getMetadataDataTable();
}
private DataTable getResultDataTable()
@@ -405,7 +404,7 @@ public class IntermediateResultsBlock implements Block {
return attachMetadataToDataTable(dataTable);
}
- private DataTable getProcessingExceptionsDataTable() {
+ private DataTable getMetadataDataTable() {
return attachMetadataToDataTable(new DataTableImplV2());
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
new file mode 100644
index 0000000..3330342
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants.Query.Response;
+import org.apache.pinot.common.utils.DataTable;
+
+
+public class StreamingResponseUtils {
+ private StreamingResponseUtils() {
+ }
+
+ public static Server.ServerResponse getDataResponse(DataTable dataTable)
+ throws IOException {
+ return getResponse(dataTable, Response.ResponseType.DATA);
+ }
+
+ public static Server.ServerResponse getMetadataResponse(DataTable dataTable)
+ throws IOException {
+ return getResponse(dataTable, Response.ResponseType.METADATA);
+ }
+
+ public static Server.ServerResponse getNonStreamingResponse(DataTable
dataTable)
+ throws IOException {
+ return getResponse(dataTable, Response.ResponseType.NON_STREAMING);
+ }
+
+ private static Server.ServerResponse getResponse(DataTable dataTable, String
responseType)
+ throws IOException {
+ return
Server.ServerResponse.newBuilder().putMetadata(Response.MetadataKeys.RESPONSE_TYPE,
responseType)
+ .setPayload(ByteString.copyFrom(dataTable.toBytes())).build();
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
new file mode 100644
index 0000000..619bcc5
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for selection only streaming queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class StreamingSelectionOnlyCombineOperator extends
BaseOperator<IntermediateResultsBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class);
+ private static final String OPERATOR_NAME =
"StreamingSelectionOnlyCombineOperator";
+
+ // Special IntermediateResultsBlock to indicate that this is the last
results block for an operator
+ private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+ new IntermediateResultsBlock(new DataSchema(new String[0], new
DataSchema.ColumnDataType[0]),
+ Collections.emptyList());
+
+ private final List<Operator> _operators;
+ private final QueryContext _queryContext;
+ private final ExecutorService _executorService;
+ private final long _endTimeMs;
+ private final StreamObserver<Server.ServerResponse> _streamObserver;
+ private final int _limit;
+
+ public StreamingSelectionOnlyCombineOperator(List<Operator> operators,
QueryContext queryContext,
+ ExecutorService executorService, long endTimeMs,
StreamObserver<Server.ServerResponse> streamObserver) {
+ _operators = operators;
+ _queryContext = queryContext;
+ _executorService = executorService;
+ _endTimeMs = endTimeMs;
+ _streamObserver = streamObserver;
+ _limit = queryContext.getLimit();
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ int numOperators = _operators.size();
+ int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+
+ // Use a BlockingQueue to store all the results blocks
+ BlockingQueue<IntermediateResultsBlock> blockingQueue = new
LinkedBlockingQueue<>();
+ // Use a Phaser to ensure all the Futures are done (not scheduled,
finished or interrupted) before the main thread
+ // returns. We need to ensure this because the main thread holds the
reference to the segments. If a segment is
+ // deleted/refreshed, the segment will be released after the main thread
returns, which would lead to undefined
+ // behavior (even JVM crash) when processing queries against it.
+ Phaser phaser = new Phaser(1);
+
+ Future[] futures = new Future[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ int threadIndex = i;
+ futures[i] = _executorService.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ try {
+ // Register the thread to the phaser
+ // NOTE: If the phaser is terminated (returning negative value)
when trying to register the thread, that
+ // means the query execution has finished, and the main
thread has deregistered itself and returned
+ // the result. Directly return as no execution result will
be taken.
+ if (phaser.register() < 0) {
+ return;
+ }
+
+ int numRowsCollected = 0;
+ for (int operatorIndex = threadIndex; operatorIndex <
numOperators; operatorIndex += numThreads) {
+ Operator<IntermediateResultsBlock> operator =
_operators.get(operatorIndex);
+ try {
+ IntermediateResultsBlock resultsBlock;
+ while ((resultsBlock = operator.nextBlock()) != null) {
+ Collection<Object[]> rows =
resultsBlock.getSelectionResult();
+ assert rows != null;
+ numRowsCollected += rows.size();
+ blockingQueue.offer(resultsBlock);
+ if (numRowsCollected >= _limit) {
+ return;
+ }
+ }
+ blockingQueue.offer(LAST_RESULTS_BLOCK);
+ } catch (EarlyTerminationException e) {
+ // Early-terminated by interruption (canceled by the main
thread)
+ return;
+ } catch (Exception e) {
+ // Caught exception, skip processing the remaining operators
+ LOGGER.error("Caught exception while executing operator of
index: {} (query: {})", operatorIndex,
+ _queryContext, e);
+ blockingQueue.offer(new IntermediateResultsBlock(e));
+ return;
+ }
+ }
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }
+ });
+ }
+
+ try {
+ int numRowsCollected = 0;
+ int numOperatorsFinished = 0;
+ while (numRowsCollected < _limit && numOperatorsFinished < numOperators)
{
+ IntermediateResultsBlock resultsBlock =
+ blockingQueue.poll(_endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ if (resultsBlock == null) {
+ // Query times out, skip streaming the remaining results blocks
+ LOGGER.error("Timed out while polling results block (query: {})",
_queryContext);
+ return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ }
+ if (resultsBlock.getProcessingExceptions() != null) {
+ // Caught exception while processing segment, skip streaming the
remaining results blocks and directly return
+ // the exception
+ return resultsBlock;
+ }
+ if (resultsBlock == LAST_RESULTS_BLOCK) {
+ numOperatorsFinished++;
+ continue;
+ }
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ Collection<Object[]> rows = resultsBlock.getSelectionResult();
+ assert dataSchema != null && rows != null;
+ numRowsCollected += rows.size();
+ DataTable dataTable =
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
+
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
+ }
+ IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock();
+ CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators);
+ return metadataBlock;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while streaming results blocks (query:
{})", _queryContext, e);
+ return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e);
+ } finally {
+ // Cancel all ongoing jobs
+ for (Future future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ // Deregister the main thread and wait for all threads done
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
new file mode 100644
index 0000000..4bb991d
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+public class StreamingSelectionOnlyOperator extends
BaseOperator<IntermediateResultsBlock> {
+ private static final String OPERATOR_NAME = "StreamingSelectionOnlyOperator";
+
+ private final IndexSegment _indexSegment;
+ private final TransformOperator _transformOperator;
+ private final List<ExpressionContext> _expressions;
+ private final BlockValSet[] _blockValSets;
+ private final DataSchema _dataSchema;
+ private final int _limit;
+
+ private int _numDocsScanned = 0;
+
+ public StreamingSelectionOnlyOperator(IndexSegment indexSegment,
QueryContext queryContext,
+ List<ExpressionContext> expressions, TransformOperator
transformOperator) {
+ _indexSegment = indexSegment;
+ _transformOperator = transformOperator;
+ _expressions = expressions;
+
+ int numExpressions = expressions.size();
+ _blockValSets = new BlockValSet[numExpressions];
+ String[] columnNames = new String[numExpressions];
+ DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[numExpressions];
+ for (int i = 0; i < numExpressions; i++) {
+ ExpressionContext expression = expressions.get(i);
+ TransformResultMetadata expressionMetadata =
transformOperator.getResultMetadata(expression);
+ columnNames[i] = expression.toString();
+ columnDataTypes[i] =
+
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(),
expressionMetadata.isSingleValue());
+ }
+ _dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+ _limit = queryContext.getLimit();
+ }
+
+ @Nullable
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ if (_numDocsScanned >= _limit) {
+ // Already returned enough documents
+ return null;
+ }
+ TransformBlock transformBlock = _transformOperator.nextBlock();
+ if (transformBlock == null) {
+ return null;
+ }
+ int numExpressions = _expressions.size();
+ for (int i = 0; i < numExpressions; i++) {
+ _blockValSets[i] = transformBlock.getBlockValueSet(_expressions.get(i));
+ }
+ RowBasedBlockValueFetcher blockValueFetcher = new
RowBasedBlockValueFetcher(_blockValSets);
+ int numDocs = transformBlock.getNumDocs();
+ int numDocsToReturn = Math.min(_limit - _numDocsScanned, numDocs);
+ List<Object[]> rows = new ArrayList<>(numDocsToReturn);
+ for (int i = 0; i < numDocsToReturn; i++) {
+ rows.add(blockValueFetcher.getRow(i));
+ }
+ _numDocsScanned += numDocs;
+ return new IntermediateResultsBlock(_dataSchema, rows);
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ long numEntriesScannedInFilter =
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = (long) _numDocsScanned *
_transformOperator.getNumColumnsProjected();
+ int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter,
+ numTotalDocs);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index b6d9155..0bce7c0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.plan;
+import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -25,6 +26,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator;
@@ -32,6 +35,7 @@ import
org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator;
+import
org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator;
import org.apache.pinot.core.query.exception.BadQueryRequestException;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
@@ -56,6 +60,7 @@ public class CombinePlanNode implements PlanNode {
private final ExecutorService _executorService;
private final long _endTimeMs;
private final int _numGroupsLimit;
+ private final StreamObserver<Server.ServerResponse> _streamObserver;
/**
* Constructor for the class.
@@ -65,14 +70,16 @@ public class CombinePlanNode implements PlanNode {
* @param executorService Executor service
* @param endTimeMs End time in milliseconds for the query
* @param numGroupsLimit Limit of number of groups stored in each segment
+ * @param streamObserver Optional stream observer for streaming query
*/
public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext,
ExecutorService executorService,
- long endTimeMs, int numGroupsLimit) {
+ long endTimeMs, int numGroupsLimit, @Nullable
StreamObserver<Server.ServerResponse> streamObserver) {
_planNodes = planNodes;
_queryContext = queryContext;
_executorService = executorService;
_endTimeMs = endTimeMs;
_numGroupsLimit = numGroupsLimit;
+ _streamObserver = streamObserver;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -155,6 +162,11 @@ public class CombinePlanNode implements PlanNode {
}
}
+ if (_streamObserver != null) {
+ // Streaming query (only support selection only)
+ return new StreamingSelectionOnlyCombineOperator(operators,
_queryContext, _executorService, _endTimeMs,
+ _streamObserver);
+ }
if (QueryContextUtils.isAggregationQuery(_queryContext)) {
if (_queryContext.getGroupByExpressions() == null) {
// Aggregation only
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
new file mode 100644
index 0000000..1ae28be
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * The {@code StreamingSelectionPlanNode} class provides the execution plan
for streaming selection query on a single
+ * segment.
+ * <p>NOTE: ORDER-BY is ignored for streaming selection query.
+ */
+public class StreamingSelectionPlanNode implements PlanNode {
+ private final IndexSegment _indexSegment;
+ private final QueryContext _queryContext;
+ private final List<ExpressionContext> _expressions;
+ private final TransformPlanNode _transformPlanNode;
+
+ public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext
queryContext) {
+ Preconditions
+ .checkState(queryContext.getOrderByExpressions() == null, "Selection
order-by is not supported for streaming");
+ _indexSegment = indexSegment;
+ _queryContext = queryContext;
+ _expressions = SelectionOperatorUtils.extractExpressions(queryContext,
indexSegment);
+ _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext,
_expressions,
+ Math.min(queryContext.getLimit(), DocIdSetPlanNode.MAX_DOC_PER_CALL));
+ }
+
+ @Override
+ public StreamingSelectionOnlyOperator run() {
+ return new StreamingSelectionOnlyOperator(_indexSegment, _queryContext,
_expressions, _transformPlanNode.run());
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 746b08a..5f6ea37 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -20,10 +20,12 @@ package org.apache.pinot.core.plan.maker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
@@ -36,6 +38,7 @@ import
org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.plan.SelectionPlanNode;
+import org.apache.pinot.core.plan.StreamingSelectionPlanNode;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.request.context.ExpressionContext;
@@ -103,7 +106,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit);
+ new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, null);
return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}
@@ -139,6 +142,28 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
}
}
+ @Override
+ public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments,
QueryContext queryContext,
+ ExecutorService executorService, StreamObserver<Server.ServerResponse>
streamObserver, long endTimeMs) {
+ List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+ for (IndexSegment indexSegment : indexSegments) {
+ planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext));
+ }
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, queryContext, executorService,
endTimeMs, _numGroupsLimit, streamObserver);
+ return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+ }
+
+ @Override
+ public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment,
QueryContext queryContext) {
+ if (QueryContextUtils.isAggregationQuery(queryContext)) {
+ throw new UnsupportedOperationException("Queries with aggregations are
not supported");
+ } else {
+ // Selection query
+ return new StreamingSelectionPlanNode(indexSegment, queryContext);
+ }
+ }
+
/**
* Returns {@code true} if the given aggregation-only without filter
QueryContext can be solved with segment metadata,
* {@code false} otherwise.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index b4a316d..9d5411a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.core.plan.maker;
+import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
@@ -43,4 +45,17 @@ public interface PlanMaker {
* Returns a segment level {@link PlanNode} which contains the logical
execution plan for one segment.
*/
PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext
queryContext);
+
+ /**
+ * Returns an instance level {@link Plan} for a streaming query which
contains the logical execution plan for multiple
+ * segments.
+ */
+ Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments,
QueryContext queryContext,
+ ExecutorService executorService, StreamObserver<Server.ServerResponse>
streamObserver, long endTimeMs);
+
+ /**
+ * Returns a segment level {@link PlanNode} for a streaming query which
contains the logical execution plan for one
+ * segment.
+ */
+ PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment,
QueryContext queryContext);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
new file mode 100644
index 0000000..e64fd34
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.proto.PinotQueryServerGrpc;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Query executor for gRPC server requests.
+ * <ul>
+ * <li>
+ * For streaming request, multiple (could be 0 if no data should be
returned, or query encounters exception) data
+ * responses will be returned, followed by one single metadata response.
+ * </li>
+ * <li>
+ * For non-streaming request, one single response containing both data and
metadata will be returned.
+ * </li>
+ * </ul>
+ * TODO: Plug in QueryScheduler
+ */
+public class GrpcQueryExecutor extends
PinotQueryServerGrpc.PinotQueryServerImplBase {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcQueryExecutor.class);
+
+ private final InstanceDataManager _instanceDataManager;
+ private final ServerMetrics _serverMetrics;
+ private final long _defaultTimeOutMs;
+ private final SegmentPrunerService _segmentPrunerService;
+ private final PlanMaker _planMaker;
+ private final ExecutorService _executorService =
+
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+
+ public GrpcQueryExecutor(PinotConfiguration config, InstanceDataManager
instanceDataManager,
+ ServerMetrics serverMetrics)
+ throws ConfigurationException {
+ _instanceDataManager = instanceDataManager;
+ _serverMetrics = serverMetrics;
+ QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
+ long defaultTimeoutMs = queryExecutorConfig.getTimeOut();
+ _defaultTimeOutMs =
+ defaultTimeoutMs > 0 ? defaultTimeoutMs :
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+ _segmentPrunerService = new
SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
+ _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
+ LOGGER.info("Initialized PinotQueryHandler with default timeout: {}ms,
numWorkerThreads: {}", _defaultTimeOutMs,
+ ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+ }
+
+ @Override
+ public void submit(Server.ServerRequest request,
StreamObserver<Server.ServerResponse> responseObserver) {
+ // Deserialize the request
+ ServerQueryRequest queryRequest;
+ try {
+ queryRequest = new ServerQueryRequest(request, _serverMetrics);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deserializing the request: {}",
request, e);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
1);
+ responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad
request").withCause(e).asException());
+ return;
+ }
+
+ // Process the query
+ try {
+ processQuery(queryRequest, responseObserver);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing request {}: {} from
broker: {}", queryRequest.getRequestId(),
+ queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ }
+ }
+
+ private void processQuery(ServerQueryRequest queryRequest,
StreamObserver<Server.ServerResponse> streamObserver) {
+ TimerContext timerContext = queryRequest.getTimerContext();
+ TimerContext.Timer schedulerWaitTimer =
timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
+ if (schedulerWaitTimer != null) {
+ schedulerWaitTimer.stopAndRecord();
+ }
+ long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
+ long querySchedulingTimeMs = System.currentTimeMillis() -
queryArrivalTimeMs;
+ TimerContext.Timer queryProcessingTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
+
+ long requestId = queryRequest.getRequestId();
+ String tableNameWithType = queryRequest.getTableNameWithType();
+ QueryContext queryContext = queryRequest.getQueryContext();
+ LOGGER.debug("Incoming request Id: {}, query: {}", requestId,
queryContext);
+ // Use the timeout passed from the request if exists, or the
instance-level timeout
+ long queryTimeoutMs = _defaultTimeOutMs;
+ Map<String, String> queryOptions = queryContext.getQueryOptions();
+ if (queryOptions != null) {
+ Long timeoutFromQueryOptions = QueryOptions.getTimeoutMs(queryOptions);
+ if (timeoutFromQueryOptions != null) {
+ queryTimeoutMs = timeoutFromQueryOptions;
+ }
+ }
+
+ // Query scheduler wait time already exceeds query timeout, directly return
+ if (querySchedulingTimeMs >= queryTimeoutMs) {
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
+ String errorMessage = String
+ .format("Query scheduling took %dms (longer than query timeout of
%dms)", querySchedulingTimeMs,
+ queryTimeoutMs);
+ DataTable dataTable = new DataTableImplV2();
+
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
+ LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
+ sendResponse(queryRequest, streamObserver, dataTable);
+ return;
+ }
+
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
+ if (tableDataManager == null) {
+ String errorMessage = "Failed to find table: " + tableNameWithType;
+ DataTable dataTable = new DataTableImplV2();
+
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
+ LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
+ sendResponse(queryRequest, streamObserver, dataTable);
+ return;
+ }
+
+ List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+ List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireSegments(segmentsToQuery);
+
+ // When segment is removed from the IdealState:
+ // 1. Controller schedules a state transition to server to turn segment
OFFLINE
+ // 2. Server gets the state transition, removes the segment data manager
and update its CurrentState
+ // 3. Controller gathers the CurrentState and update the ExternalView
+ // 4. Broker watches ExternalView change and updates the routing table to
stop querying the segment
+ //
+ // After step 2 but before step 4, segment will be missing on server side
+ // TODO: Change broker to watch both IdealState and ExternalView to not
query the removed segments
+ int numSegmentsQueried = segmentsToQuery.size();
+ int numSegmentsAcquired = segmentDataManagers.size();
+ if (numSegmentsQueried > numSegmentsAcquired) {
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_MISSING_SEGMENTS,
+ numSegmentsQueried - numSegmentsAcquired);
+ }
+
+ boolean enableTrace = queryRequest.isEnableTrace();
+ if (enableTrace) {
+ TraceContext.register(requestId);
+ }
+
+ int numConsumingSegmentsProcessed = 0;
+ long minIndexTimeMs = Long.MAX_VALUE;
+ long minIngestionTimeMs = Long.MAX_VALUE;
+ // gather stats for realtime consuming segments
+ for (SegmentDataManager segmentMgr : segmentDataManagers) {
+ if (segmentMgr.getSegment() instanceof MutableSegment) {
+ numConsumingSegmentsProcessed += 1;
+ SegmentMetadata metadata =
segmentMgr.getSegment().getSegmentMetadata();
+ long indexedTime = metadata.getLastIndexedTimestamp();
+ if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
+ minIndexTimeMs = metadata.getLastIndexedTimestamp();
+ }
+ long ingestionTime = metadata.getLatestIngestionTimestamp();
+ if (ingestionTime != Long.MIN_VALUE && ingestionTime <
minIngestionTimeMs) {
+ minIngestionTimeMs = ingestionTime;
+ }
+ }
+ }
+
+ long minConsumingFreshnessTimeMs = minIngestionTimeMs;
+ if (numConsumingSegmentsProcessed > 0) {
+ if (minIngestionTimeMs == Long.MAX_VALUE) {
+ LOGGER.debug("Did not find valid ingestionTimestamp across consuming
segments! Using indexTime instead");
+ minConsumingFreshnessTimeMs = minIndexTimeMs;
+ }
+ LOGGER
+ .debug("Querying: {} consuming segments with
minConsumingFreshnessTimeMs: {}", numConsumingSegmentsProcessed,
+ minConsumingFreshnessTimeMs);
+ }
+
+ DataTable dataTable = null;
+ try {
+ // Compute total docs for the table before pruning the segments
+ long numTotalDocs = 0;
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ numTotalDocs +=
segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
+ }
+ TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+ segmentDataManagers = _segmentPrunerService.prune(tableDataManager,
segmentDataManagers, queryRequest);
+ segmentPruneTimer.stopAndRecord();
+ int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
+ LOGGER.debug("Matched {} segments after pruning",
numSegmentsMatchedAfterPruning);
+ if (numSegmentsMatchedAfterPruning == 0) {
+ dataTable =
+ queryRequest.isEnableStreaming() ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ Map<String, String> metadata = dataTable.getMetadata();
+ metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
+ metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
+ metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+ metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ } else {
+ TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+ List<IndexSegment> indexSegments = new
ArrayList<>(numSegmentsMatchedAfterPruning);
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ indexSegments.add(segmentDataManager.getSegment());
+ }
+ long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
+ Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
+ .makeStreamingInstancePlan(indexSegments, queryContext,
_executorService, streamObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(indexSegments, queryContext,
_executorService, endTimeMs);
+ planBuildTimer.stopAndRecord();
+
+ TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+ dataTable = globalQueryPlan.execute();
+ planExecTimer.stopAndRecord();
+
+ // Update the total docs in the metadata based on un-pruned segments.
+ dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+ }
+ } catch (Exception e) {
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
+
+ // Do not log error for BadQueryRequestException because it's caused by
bad query
+ if (e instanceof BadQueryRequestException) {
+ LOGGER.info("Caught BadQueryRequestException while processing
requestId: {}, {}", requestId, e.getMessage());
+ } else {
+ LOGGER.error("Exception processing requestId {}", requestId, e);
+ }
+
+ dataTable = new DataTableImplV2();
+
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ if (enableTrace) {
+ if (dataTable != null) {
+ dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY,
TraceContext.getTraceInfo());
+ }
+ TraceContext.unregister();
+ }
+ }
+
+ queryProcessingTimer.stopAndRecord();
+ long queryProcessingTime = queryProcessingTimer.getDurationMs();
+ dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED,
Integer.toString(numSegmentsQueried));
+ dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY,
Long.toString(queryProcessingTime));
+
+ if (numConsumingSegmentsProcessed > 0) {
+ dataTable.getMetadata()
+ .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
Integer.toString(numConsumingSegmentsProcessed));
+ dataTable.getMetadata()
+ .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
Long.toString(minConsumingFreshnessTimeMs));
+ }
+
+ LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
+ LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
+
+ // TODO: Log query stats
+
+ sendResponse(queryRequest, streamObserver, dataTable);
+ }
+
+ private void sendResponse(ServerQueryRequest queryRequest,
StreamObserver<Server.ServerResponse> streamObserver,
+ DataTable dataTable) {
+ Server.ServerResponse response;
+ try {
+ response = queryRequest.isEnableStreaming() ?
StreamingResponseUtils.getMetadataResponse(dataTable)
+ : StreamingResponseUtils.getNonStreamingResponse(dataTable);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while constructing response from data
table for request {}: {} from broker: {}",
+ queryRequest.getRequestId(), queryRequest.getQueryContext(),
queryRequest.getBrokerId(), e);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
1);
+ streamObserver.onError(Status.INTERNAL.withCause(e).asException());
+ return;
+ }
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 1763335..24f2fa2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -19,14 +19,20 @@
package org.apache.pinot.core.query.request;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TCompactProtocol;
/**
@@ -36,11 +42,14 @@ import
org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
* per segment basis.
*/
public class ServerQueryRequest {
+ private static final CalciteSqlCompiler SQL_COMPILER = new
CalciteSqlCompiler();
+
private final long _requestId;
+ private final String _brokerId;
+ private final boolean _enableTrace;
+ private final boolean _enableStreaming;
private final String _tableNameWithType;
private final List<String> _segmentsToQuery;
- private final boolean _enableTrace;
- private final String _brokerId;
// Timing information for different phases of query execution
private final TimerContext _timerContext;
@@ -51,11 +60,12 @@ public class ServerQueryRequest {
public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics
serverMetrics, long queryArrivalTimeMs) {
_requestId = instanceRequest.getRequestId();
+ _brokerId = instanceRequest.getBrokerId() != null ?
instanceRequest.getBrokerId() : "unknown";
+ _enableTrace = instanceRequest.isEnableTrace();
+ _enableStreaming = false;
BrokerRequest brokerRequest = instanceRequest.getQuery();
_tableNameWithType = brokerRequest.getQuerySource().getTableName();
_segmentsToQuery = instanceRequest.getSearchSegments();
- _enableTrace = instanceRequest.isEnableTrace();
- _brokerId = instanceRequest.getBrokerId() != null ?
instanceRequest.getBrokerId() : "unknown";
_timerContext = new TimerContext(_tableNameWithType, serverMetrics,
queryArrivalTimeMs);
// Pre-compute segment independent information
@@ -63,24 +73,59 @@ public class ServerQueryRequest {
_allColumns = QueryContextUtils.getAllColumns(_queryContext);
}
- public long getRequestId() {
- return _requestId;
+ public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics
serverMetrics)
+ throws Exception {
+ long queryArrivalTimeMs = System.currentTimeMillis();
+
+ Map<String, String> metadata = serverRequest.getMetadataMap();
+ _requestId =
Long.parseLong(metadata.getOrDefault(Request.MetadataKeys.REQUEST_ID, "0"));
+ _brokerId = metadata.getOrDefault(Request.MetadataKeys.BROKER_ID,
"unknown");
+ _enableTrace =
Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_TRACE));
+ _enableStreaming =
Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING));
+
+ BrokerRequest brokerRequest;
+ String payloadType =
metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE,
Request.PayloadType.SQL);
+ if (payloadType.equalsIgnoreCase(Request.PayloadType.SQL)) {
+ brokerRequest =
SQL_COMPILER.compileToBrokerRequest(serverRequest.getSql());
+ } else if
(payloadType.equalsIgnoreCase(Request.PayloadType.BROKER_REQUEST)) {
+ brokerRequest = new BrokerRequest();
+ new TDeserializer(new TCompactProtocol.Factory())
+ .deserialize(brokerRequest,
serverRequest.getPayload().toByteArray());
+ } else {
+ throw new UnsupportedOperationException("Unsupported payloadType: " +
payloadType);
+ }
+
+ _tableNameWithType = brokerRequest.getQuerySource().getTableName();
+ _segmentsToQuery = serverRequest.getSegmentsList();
+ _timerContext = new TimerContext(_tableNameWithType, serverMetrics,
queryArrivalTimeMs);
+
+ // Pre-compute segment independent information
+ _queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ _allColumns = QueryContextUtils.getAllColumns(_queryContext);
}
- public String getTableNameWithType() {
- return _tableNameWithType;
+ public long getRequestId() {
+ return _requestId;
}
- public List<String> getSegmentsToQuery() {
- return _segmentsToQuery;
+ public String getBrokerId() {
+ return _brokerId;
}
public boolean isEnableTrace() {
return _enableTrace;
}
- public String getBrokerId() {
- return _brokerId;
+ public boolean isEnableStreaming() {
+ return _enableStreaming;
+ }
+
+ public String getTableNameWithType() {
+ return _tableNameWithType;
+ }
+
+ public List<String> getSegmentsToQuery() {
+ return _segmentsToQuery;
}
public TimerContext getTimerContext() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
similarity index 54%
rename from
pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index e0fef72..7413070 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -16,28 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.conf;
+package org.apache.pinot.core.transport.grpc;
-import java.util.Optional;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import java.io.IOException;
+import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.spi.env.PinotConfiguration;
+public class GrpcQueryServer {
+ private final Server _server;
-public class NettyServerConfig {
- private static String NETTY_SERVER_PORT = "port";
-
- private final int port;
+ public GrpcQueryServer(int port, GrpcQueryExecutor queryExecutor) {
+ _server = ServerBuilder.forPort(port).addService(queryExecutor).build();
+ }
- public NettyServerConfig(PinotConfiguration serverNettyConfig) throws
ConfigurationException {
- this.port =
Optional.ofNullable(serverNettyConfig.getProperty(NETTY_SERVER_PORT,
Integer.class))
- .orElseThrow(() -> new ConfigurationException("Cannot find Key : " +
NETTY_SERVER_PORT));
+ public void start() {
+ try {
+ _server.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- /**
- * @return Netty server port
- */
- public int getPort() {
- return port;
+ public void shutdown() {
+ try {
+ _server.shutdown().awaitTermination();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index 5ee3da2..e45b441 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -230,7 +230,7 @@ public class SelectionCombineOperatorTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
queryContext, EXECUTOR,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
return combinePlanNode.run().nextBlock();
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index b0d5d13..11053c7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -58,7 +58,7 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
_queryContext, _executorService,
System.currentTimeMillis() +
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
combinePlanNode.run();
Assert.assertEquals(numPlans, count.get());
}
@@ -83,7 +83,7 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode =
new CombinePlanNode(planNodes, _queryContext, _executorService,
System.currentTimeMillis() + 100,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
@@ -105,7 +105,7 @@ public class CombinePlanNodeTest {
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes,
_queryContext, _executorService,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
- InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index c66a6f2..c1a3a85 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.integration.tests;
+import org.testng.annotations.Test;
+
+
/**
* Integration test that extends OfflineClusterIntegrationTest but start
multiple brokers and servers.
*/
@@ -34,4 +37,15 @@ public class MultiNodesOfflineClusterIntegrationTest extends
OfflineClusterInteg
protected int getNumServers() {
return NUM_SERVERS;
}
+
+ @Override
+ protected void startServers() {
+ startServers(NUM_SERVERS);
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testGrpcQueryServer() {
+ // Ignored
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7475715..790968a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -30,21 +30,31 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -130,7 +140,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
startZk();
startController();
startBrokers(getNumBrokers());
- startServers(getNumServers());
+ startServers();
// Create and upload the schema and table config
Schema schema = createSchema();
@@ -160,6 +170,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
waitForAllDocsLoaded(600_000L);
}
+ protected void startServers() {
+ // Enable gRPC server
+ PinotConfiguration serverConfig = getDefaultServerConfiguration();
+
serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER,
true);
+ startServer(serverConfig);
+ }
+
private void registerCallbackHandlers() {
List<String> instances =
_helixAdmin.getInstancesInCluster(getHelixClusterName());
instances.removeIf(instance ->
(!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) &&
!instance
@@ -973,15 +990,18 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
public void testQueryWithSameAlias()
throws Exception {
//test repeated columns in selection query
- String query = "SELECT ArrTime AS ArrTime, Carrier AS Carrier,
DaysSinceEpoch AS DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
+ String query =
+ "SELECT ArrTime AS ArrTime, Carrier AS Carrier, DaysSinceEpoch AS
DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
testQuery(query, Collections.singletonList(query));
//test repeated columns in selection query
- query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch,
Carrier AS Carrier FROM mytable ORDER BY Carrier DESC";
+ query =
+ "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier
AS Carrier FROM mytable ORDER BY Carrier DESC";
testQuery(query, Collections.singletonList(query));
//test repeated columns in selection query
- query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch,
Carrier AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
+ query =
+ "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier
AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
testQuery(query, Collections.singletonList(query));
}
@@ -1124,11 +1144,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
sql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier,
DestAirportID";
testSqlQuery(pql, Collections.singletonList(sql));
- pql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY
Carrier, DestAirportID, DestStateName LIMIT 1000000";
+ pql =
+ "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY
Carrier, DestAirportID, DestStateName LIMIT 1000000";
sql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY
Carrier, DestAirportID, DestStateName";
testSqlQuery(pql, Collections.singletonList(sql));
- pql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY
Carrier, DestAirportID, DestCityName LIMIT 1000000";
+ pql =
+ "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY
Carrier, DestAirportID, DestCityName LIMIT 1000000";
sql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY
Carrier, DestAirportID, DestCityName";
testSqlQuery(pql, Collections.singletonList(sql));
@@ -1160,7 +1182,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"SELECT COUNT(*) FROM mytable GROUP BY
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
- baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+ baseQueries
+ .forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
for (String query : queries) {
try {
@@ -1213,8 +1236,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM
mytable",
"SELECT COUNT(*) FROM mytable GROUP BY
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
- baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
- baseQueries.forEach(q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+ baseQueries
+ .forEach(q -> queries.add(q.replace("mytable",
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+ baseQueries.forEach(
+ q -> queries.add(q.replace("mytable",
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
for (String query : queries) {
try {
@@ -1279,6 +1304,61 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
query = "SELECT c_o_u_n_t(FlightNum) FROM mytable ";
assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(),
115545);
assertEquals(postSqlQuery(query,
_brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(),
115545);
+ }
+
+ @Test
+ public void testGrpcQueryServer()
+ throws Exception {
+ GrpcQueryClient queryClient = new GrpcQueryClient("localhost",
CommonConstants.Server.DEFAULT_GRPC_PORT);
+ String sql = "SELECT * FROM mytable_OFFLINE LIMIT 1000000";
+ BrokerRequest brokerRequest = new
Pql2Compiler().compileToBrokerRequest(sql);
+ List<String> segments =
_helixResourceManager.getSegmentsFor("mytable_OFFLINE");
+
+ GrpcRequestBuilder requestBuilder = new
GrpcRequestBuilder().setSegments(segments);
+
testNonStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build()));
+
testNonStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build()));
+
+ requestBuilder.setEnableStreaming(true);
+
testStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build()));
+
testStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build()));
+ }
+
+ private void testNonStreamingRequest(Iterator<Server.ServerResponse>
nonStreamingResponses)
+ throws Exception {
+ int expectedNumDocs = (int) getCountStarResult();
+ assertTrue(nonStreamingResponses.hasNext());
+ Server.ServerResponse nonStreamingResponse = nonStreamingResponses.next();
+
assertEquals(nonStreamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE),
+ CommonConstants.Query.Response.ResponseType.NON_STREAMING);
+ DataTable dataTable =
DataTableFactory.getDataTable(nonStreamingResponse.getPayload().asReadOnlyByteBuffer());
+ assertNotNull(dataTable.getDataSchema());
+ assertEquals(dataTable.getNumberOfRows(), expectedNumDocs);
+ Map<String, String> metadata = dataTable.getMetadata();
+ assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY),
Integer.toString(expectedNumDocs));
+ }
+ private void testStreamingRequest(Iterator<Server.ServerResponse>
streamingResponses)
+ throws Exception {
+ int expectedNumDocs = (int) getCountStarResult();
+ int numTotalDocs = 0;
+ while (streamingResponses.hasNext()) {
+ Server.ServerResponse streamingResponse = streamingResponses.next();
+ DataTable dataTable =
DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer());
+ String responseType =
+
streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE);
+ if
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
+ assertTrue(dataTable.getMetadata().isEmpty());
+ assertNotNull(dataTable.getDataSchema());
+ numTotalDocs += dataTable.getNumberOfRows();
+ } else {
+ assertEquals(responseType,
CommonConstants.Query.Response.ResponseType.METADATA);
+ assertFalse(streamingResponses.hasNext());
+ assertEquals(numTotalDocs, expectedNumDocs);
+ assertNull(dataTable.getDataSchema());
+ assertEquals(dataTable.getNumberOfRows(), 0);
+ Map<String, String> metadata = dataTable.getMetadata();
+ assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY),
Integer.toString(expectedNumDocs));
+ }
+ }
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index e7c9a97..1b5087f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -20,9 +20,8 @@ package org.apache.pinot.server.conf;
import java.util.Arrays;
import java.util.List;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -30,6 +29,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
* The config used for Server.
*/
public class ServerConf {
+ // TODO: Replace with constants in CommonConstants
private static final String PINOT_ = "pinot.";
private static final String PINOT_SERVER_INSTANCE = "pinot.server.instance";
private static final String PINOT_SERVER_METRICS = "pinot.server.metrics";
@@ -37,7 +37,6 @@ public class ServerConf {
private static final String PINOT_SERVER_TABLE_LEVEL_METRICS =
"pinot.server.enableTableLevelMetrics";
private static final String PINOT_SERVER_QUERY =
"pinot.server.query.executor";
private static final String PINOT_SERVER_REQUEST = "pinot.server.request";
- private static final String PINOT_SERVER_NETTY = "pinot.server.netty";
private static final String PINOT_SERVER_INSTANCE_DATA_MANAGER_CLASS =
"pinot.server.instance.data.manager.class";
private static final String PINOT_SERVER_QUERY_EXECUTOR_CLASS =
"pinot.server.query.executor.class";
private static final String PINOT_SERVER_TRANSFORM_FUNCTIONS =
"pinot.server.transforms";
@@ -70,9 +69,16 @@ public class ServerConf {
return _serverConf.subset(PINOT_SERVER_METRICS);
}
- public NettyServerConfig getNettyConfig()
- throws ConfigurationException {
- return new NettyServerConfig(_serverConf.subset(PINOT_SERVER_NETTY));
+ public int getNettyPort() {
+ return _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT,
Helix.DEFAULT_SERVER_NETTY_PORT);
+ }
+
+ public boolean isEnableGrpcServer() {
+ return _serverConf.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER,
Server.DEFAULT_ENABLE_GRPC_SERVER);
+ }
+
+ public int getGrpcPort() {
+ return _serverConf.getProperty(Server.CONFIG_OF_GRPC_PORT,
Server.DEFAULT_GRPC_PORT);
}
public PinotConfiguration getConfig(String component) {
@@ -104,6 +110,6 @@ public class ServerConf {
}
public String getMetricsPrefix() {
- return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX,
CommonConstants.Server.DEFAULT_METRICS_PREFIX);
+ return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX,
Server.DEFAULT_METRICS_PREFIX);
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ae448ad..fa1903c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -30,11 +30,14 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
+import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
import org.apache.pinot.core.transport.QueryServer;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +54,8 @@ public class ServerInstance {
private final QueryExecutor _queryExecutor;
private final LongAccumulator _latestQueryTime;
private final QueryScheduler _queryScheduler;
- private final QueryServer _queryServer;
+ private final QueryServer _nettyQueryServer;
+ private final GrpcQueryServer _grpcQueryServer;
private boolean _started = false;
@@ -77,16 +81,27 @@ public class ServerInstance {
String queryExecutorClassName = serverConf.getQueryExecutorClassName();
LOGGER.info("Initializing query executor of class: {}",
queryExecutorClassName);
_queryExecutor = (QueryExecutor)
Class.forName(queryExecutorClassName).newInstance();
- _queryExecutor.init(serverConf.getQueryExecutorConfig(),
_instanceDataManager, _serverMetrics);
+ PinotConfiguration queryExecutorConfig =
serverConf.getQueryExecutorConfig();
+ _queryExecutor.init(queryExecutorConfig, _instanceDataManager,
_serverMetrics);
LOGGER.info("Initializing query scheduler");
_latestQueryTime = new LongAccumulator(Long::max, 0);
_queryScheduler =
QuerySchedulerFactory.create(serverConf.getSchedulerConfig(),
_queryExecutor, _serverMetrics, _latestQueryTime);
- int queryServerPort = serverConf.getNettyConfig().getPort();
- LOGGER.info("Initializing query server on port: {}", queryServerPort);
- _queryServer = new QueryServer(queryServerPort, _queryScheduler,
_serverMetrics);
+ int nettyPort = serverConf.getNettyPort();
+ LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
+ _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler,
_serverMetrics);
+
+ if (serverConf.isEnableGrpcServer()) {
+ int grpcPort = serverConf.getGrpcPort();
+ LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
+ GrpcQueryExecutor grpcQueryExecutor =
+ new GrpcQueryExecutor(queryExecutorConfig, _instanceDataManager,
_serverMetrics);
+ _grpcQueryServer = new GrpcQueryServer(grpcPort, grpcQueryExecutor);
+ } else {
+ _grpcQueryServer = null;
+ }
LOGGER.info("Initializing transform functions");
Set<Class<TransformFunction>> transformFunctionClasses = new HashSet<>();
@@ -119,8 +134,12 @@ public class ServerInstance {
_queryExecutor.start();
LOGGER.info("Starting query scheduler");
_queryScheduler.start();
- LOGGER.info("Starting query server");
- _queryServer.start();
+ LOGGER.info("Starting Netty query server");
+ _nettyQueryServer.start();
+ if (_grpcQueryServer != null) {
+ LOGGER.info("Starting gRPC query server");
+ _grpcQueryServer.start();
+ }
_started = true;
LOGGER.info("Finish starting server instance");
@@ -130,8 +149,12 @@ public class ServerInstance {
Preconditions.checkState(_started, "Server instance is not running");
LOGGER.info("Shutting down server instance");
- LOGGER.info("Shutting down query server");
- _queryServer.shutDown();
+ if (_grpcQueryServer != null) {
+ LOGGER.info("Shutting down gRPC query server");
+ _grpcQueryServer.shutdown();
+ }
+ LOGGER.info("Shutting down Netty query server");
+ _nettyQueryServer.shutDown();
LOGGER.info("Shutting down query scheduler");
_queryScheduler.stop();
LOGGER.info("Shutting down query executor");
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
deleted file mode 100644
index bee45de..0000000
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.grpc;
-
-public class PinotQueryService {
-
-}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index abe2275..b419d83 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -18,54 +18,17 @@
*/
package org.apache.pinot.server.starter.helix;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.ACCESS_CONTROL_FACTORY_CLASS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
-import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -85,9 +48,10 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.Instance;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel;
+import org.apache.pinot.common.utils.CommonConstants.Server;
import
org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -110,9 +74,6 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
/**
* Starter for Pinot server.
@@ -142,6 +103,7 @@ public class HelixServerStarter implements ServiceStartable
{
private final String _host;
private final int _port;
private final String _instanceId;
+ private final HelixConfigScope _instanceConfigScope;
private HelixManager _helixManager;
private HelixAdmin _helixAdmin;
private ServerInstance _serverInstance;
@@ -155,23 +117,21 @@ public class HelixServerStarter implements
ServiceStartable {
// Make a clone so that changes to the config won't propagate to the caller
_serverConf = serverConf.clone();
- _host = _serverConf.getProperty(KEY_OF_SERVER_NETTY_HOST,
-
_serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY,
false) ? NetUtil
- .getHostnameOrAddress() : NetUtil.getHostAddress());
- _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT,
DEFAULT_SERVER_NETTY_PORT);
+ _host = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
+ _serverConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false)
? NetUtil.getHostnameOrAddress()
+ : NetUtil.getHostAddress());
+ _port = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT,
Helix.DEFAULT_SERVER_NETTY_PORT);
- _instanceId =
Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID))
-
- // InstanceId is not configured. Fallback to an auto generated config.
- .orElseGet(this::initializeDefaultInstanceId);
- }
-
- private String initializeDefaultInstanceId() {
- String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
-
- _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, instanceId);
+ String instanceId = _serverConf.getProperty(Server.CONFIG_OF_INSTANCE_ID);
+ if (instanceId == null) {
+ instanceId = Helix.PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
+ _serverConf.addProperty(Server.CONFIG_OF_INSTANCE_ID, instanceId);
+ }
+ _instanceId = instanceId;
- return instanceId;
+ _instanceConfigScope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT,
_helixClusterName).forParticipant(_instanceId)
+ .build();
}
/**
@@ -179,9 +139,11 @@ public class HelixServerStarter implements
ServiceStartable {
*/
private void registerServiceStatusHandler() {
double minResourcePercentForStartup = _serverConf
- .getProperty(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
- int realtimeConsumptionCatchupWaitMs =
_serverConf.getProperty(CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
- DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+ .getProperty(Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
+ Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
+ int realtimeConsumptionCatchupWaitMs = _serverConf
+
.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
+ Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
@@ -235,25 +197,6 @@ public class HelixServerStarter implements
ServiceStartable {
new
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}
- private void setAdminApiPort(int adminApiPort) {
- Map<String, String> propToUpdate = new HashMap<>();
- propToUpdate.put(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
- updateInstanceConfigInHelix(propToUpdate);
- }
-
- private void setShuttingDownStatus(boolean shuttingDownStatus) {
- Map<String, String> propToUpdate = new HashMap<>();
- propToUpdate.put(IS_SHUTDOWN_IN_PROGRESS,
String.valueOf(shuttingDownStatus));
- updateInstanceConfigInHelix(propToUpdate);
- }
-
- private void updateInstanceConfigInHelix(Map<String, String> props) {
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT,
_helixClusterName).forParticipant(_instanceId)
- .build();
- _helixAdmin.setConfig(scope, props);
- }
-
private void updateInstanceConfigIfNeeded(String host, int port) {
InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(_helixClusterName, _instanceId);
boolean needToUpdateInstanceConfig = false;
@@ -265,7 +208,7 @@ public class HelixServerStarter implements ServiceStartable
{
instanceConfig.addTag(TagNameUtils.getOfflineTagForTenant(null));
instanceConfig.addTag(TagNameUtils.getRealtimeTagForTenant(null));
} else {
- instanceConfig.addTag(UNTAGGED_SERVER_INSTANCE);
+ instanceConfig.addTag(Helix.UNTAGGED_SERVER_INSTANCE);
}
needToUpdateInstanceConfig = true;
}
@@ -303,7 +246,7 @@ public class HelixServerStarter implements ServiceStartable
{
// from ZooKeeper). Setting flapping time window to a small value can
avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
- _serverConf.getProperty(CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS,
DEFAULT_FLAPPING_TIME_WINDOW_MS));
+
_serverConf.getProperty(Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS,
Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
/**
@@ -314,8 +257,8 @@ public class HelixServerStarter implements ServiceStartable
{
private void startupServiceStatusCheck(long endTimeMs) {
LOGGER.info("Starting startup service status check");
long startTimeMs = System.currentTimeMillis();
- long checkIntervalMs = _serverConf
- .getProperty(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS,
DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
+ long checkIntervalMs =
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS,
+ Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
while (System.currentTimeMillis() < endTimeMs) {
Status serviceStatus = ServiceStatus.getServiceStatus();
@@ -370,7 +313,8 @@ public class HelixServerStarter implements ServiceStartable
{
_serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
- SegmentFetcherAndLoader fetcherAndLoader = new
SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics);
+ SegmentFetcherAndLoader fetcherAndLoader =
+ new SegmentFetcherAndLoader(_serverConf, instanceDataManager,
serverMetrics);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager, fetcherAndLoader);
_helixManager.getStateMachineEngine()
@@ -386,7 +330,7 @@ public class HelixServerStarter implements ServiceStartable
{
// Start restlet server for admin API endpoint
String accessControlFactoryClass =
- _serverConf.getProperty(ACCESS_CONTROL_FACTORY_CLASS,
DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
+ _serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS,
Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
LOGGER.info("Using class: {} as the AccessControlFactory",
accessControlFactoryClass);
final AccessControlFactory accessControlFactory;
try {
@@ -397,10 +341,20 @@ public class HelixServerStarter implements
ServiceStartable {
+ "'", e);
}
- int adminApiPort = _serverConf.getProperty(CONFIG_OF_ADMIN_API_PORT,
DEFAULT_ADMIN_API_PORT);
+ // Update admin API port
+ int adminApiPort =
_serverConf.getProperty(Server.CONFIG_OF_ADMIN_API_PORT,
Server.DEFAULT_ADMIN_API_PORT);
_adminApiApplication = new AdminApiApplication(_serverInstance,
accessControlFactory);
_adminApiApplication.start(adminApiPort);
- setAdminApiPort(adminApiPort);
+ _helixAdmin.setConfig(_instanceConfigScope,
+ Collections.singletonMap(Instance.ADMIN_PORT_KEY,
String.valueOf(adminApiPort)));
+
+ // Update gRPC port
+ if (serverInstanceConfig.isEnableGrpcServer()) {
+ _helixAdmin.setConfig(_instanceConfigScope,
+ Collections.singletonMap(Instance.GRPC_PORT_KEY,
String.valueOf(serverInstanceConfig.getGrpcPort())));
+ } else {
+ _helixAdmin.removeConfig(_instanceConfigScope,
Collections.singletonList(Instance.GRPC_PORT_KEY));
+ }
// Register message handler factory
SegmentMessageHandlerFactory messageHandlerFactory =
@@ -408,19 +362,21 @@ public class HelixServerStarter implements
ServiceStartable {
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
messageHandlerFactory);
- serverMetrics.addCallbackGauge(INSTANCE_CONNECTED_METRIC_NAME, () ->
_helixManager.isConnected() ? 1L : 0L);
+ serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () ->
_helixManager.isConnected() ? 1L : 0L);
_helixManager
.addPreConnectCallback(() ->
serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
// Register the service status handler
registerServiceStatusHandler();
- if (_serverConf
- .getProperty(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK,
DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
- long endTimeMs = startTimeMs +
_serverConf.getProperty(CONFIG_OF_STARTUP_TIMEOUT_MS,
DEFAULT_STARTUP_TIMEOUT_MS);
+ if
(_serverConf.getProperty(Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK,
+ Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
+ long endTimeMs =
+ startTimeMs +
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS,
Server.DEFAULT_STARTUP_TIMEOUT_MS);
startupServiceStatusCheck(endTimeMs);
}
- setShuttingDownStatus(false);
+ _helixAdmin.setConfig(_instanceConfigScope,
+ Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS,
Boolean.toString(false)));
LOGGER.info("Pinot server ready");
// Create metrics for mmap stuff
@@ -446,15 +402,19 @@ public class HelixServerStarter implements
ServiceStartable {
LOGGER.warn("Caught exception closing PinotFS classes", e);
}
_adminApiApplication.stop();
- setShuttingDownStatus(true);
+ _helixAdmin.setConfig(_instanceConfigScope,
+ Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS,
Boolean.toString(true)));
- long endTimeMs = startTimeMs +
_serverConf.getProperty(CONFIG_OF_SHUTDOWN_TIMEOUT_MS,
DEFAULT_SHUTDOWN_TIMEOUT_MS);
- if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+ long endTimeMs =
+ startTimeMs +
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS,
Server.DEFAULT_SHUTDOWN_TIMEOUT_MS);
+ if (_serverConf
+ .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
shutdownQueryCheck(endTimeMs);
}
_helixManager.disconnect();
_serverInstance.shutDown();
- if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK,
DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+ if (_serverConf
+ .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK,
Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
shutdownResourceCheck(endTimeMs);
}
_realtimeLuceneIndexRefreshState.stop();
@@ -472,8 +432,9 @@ public class HelixServerStarter implements ServiceStartable
{
LOGGER.info("Starting shutdown query check");
long startTimeMs = System.currentTimeMillis();
- long maxQueryTimeMs =
_serverConf.getProperty(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
- long noQueryThresholdMs =
_serverConf.getProperty(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS,
maxQueryTimeMs);
+ long maxQueryTimeMs =
+ _serverConf.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT,
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+ long noQueryThresholdMs =
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS,
maxQueryTimeMs);
// Wait until no incoming queries
boolean noIncomingQueries = false;
@@ -553,8 +514,8 @@ public class HelixServerStarter implements ServiceStartable
{
}
}
- long checkIntervalMs = _serverConf
- .getProperty(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS,
DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+ long checkIntervalMs =
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS,
+ Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
while (System.currentTimeMillis() < endTimeMs) {
Iterator<String> iterator = resourcesToMonitor.iterator();
String currentResource = null;
@@ -639,9 +600,9 @@ public class HelixServerStarter implements ServiceStartable
{
throws Exception {
Map<String, Object> properties = new HashMap<>();
int port = 8003;
- properties.put(KEY_OF_SERVER_NETTY_PORT, port);
- properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port
+ "/index");
- properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test"
+ port + "/segmentTar");
+ properties.put(Helix.KEY_OF_SERVER_NETTY_PORT, port);
+ properties.put(Server.CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test"
+ port + "/index");
+ properties.put(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
"/tmp/PinotServer/test" + port + "/segmentTar");
HelixServerStarter serverStarter =
new HelixServerStarter("quickstart", "localhost:2191", new
PinotConfiguration(properties));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]