Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2148#discussion_r181620436
--- Diff:
store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.master;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+import org.apache.carbondata.store.protocol.EchoRequest;
+import org.apache.carbondata.store.protocol.EchoResponse;
+import org.apache.carbondata.store.protocol.SearchRequest;
+import org.apache.carbondata.store.protocol.SearchResult;
+import org.apache.carbondata.store.protocol.ShutdownRequest;
+import org.apache.carbondata.store.protocol.ShutdownResponse;
+import org.apache.carbondata.store.protocol.WorkerGrpc;
+import org.apache.carbondata.store.util.GrpcSerdes;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Master of CarbonSearch.
+ * It listens to {@link Master#DEFAULT_PORT} to wait for worker to
register.
+ * And it provides search API to fire RPC call to workers.
+ */
[email protected]
+public class Master {
+
+ private static final LogService LOG =
LogServiceFactory.getLogService(Master.class.getName());
+
+ public static final int DEFAULT_PORT = 10020;
+
+ private Server registryServer;
+
+ private int port;
+
+ private Random random = new Random();
+
+ /** mapping of worker hostname to rpc stub */
+ private Map<String, WorkerGrpc.WorkerFutureStub> workers;
+
+ public Master() {
+ this(DEFAULT_PORT);
+ }
+
+ public Master(int port) {
+ this.port = port;
+ this.workers = new ConcurrentHashMap<>();
+ }
+
+ /** start service and listen on port passed in constructor */
+ public void startService() throws IOException {
+ if (registryServer == null) {
+ /* The port on which the registryServer should run */
+ registryServer = ServerBuilder.forPort(port)
+ .addService(new RegistryService(this))
+ .build()
+ .start();
+ LOG.info("Master started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override public void run() {
+ // Use stderr here since the logger may have been reset by its
JVM shutdown hook.
+ LOG.info("*** shutting down gRPC Master since JVM is shutting
down");
+ stopService();
+ LOG.info("*** Master shut down");
+ }
+ });
+ }
+ }
+
+ public void stopService() {
+ if (registryServer != null) {
+ registryServer.shutdown();
+ }
+ }
+
+ public void stopAllWorkers() throws IOException, ExecutionException,
InterruptedException {
+ ShutdownRequest request = ShutdownRequest.newBuilder()
+ .setTrigger(ShutdownRequest.Trigger.USER)
+ .build();
+ for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker :
workers.entrySet()) {
+ ListenableFuture<ShutdownResponse> future =
worker.getValue().shutdown(request);
+ ShutdownResponse response = future.get();
+ if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
+ LOG.error("failed to shutdown worker: " + response.getMessage());
+ throw new IOException(response.getMessage());
+ } else {
+ workers.remove(worker.getKey());
+ }
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses
daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (registryServer != null) {
+ registryServer.awaitTermination();
+ }
+ }
+
+ /** A new searcher is trying to register, add it to the map and connect
to this searcher */
+ void addWorker(String workerHostname, int port, int cores)
+ throws ExecutionException, InterruptedException {
+ Objects.requireNonNull(workerHostname);
+
+ LOG.info("trying to connect to searcher " + workerHostname + ":" +
port);
+ ManagedChannel channelToWorker =
ManagedChannelBuilder.forAddress(workerHostname, port)
+ .usePlaintext(true)
+ .maxInboundMessageSize(200 * 1000 * 1000)
+ .build();
+ WorkerGrpc.WorkerFutureStub futureStub =
WorkerGrpc.newFutureStub(channelToWorker);
+
+ // try to send a message to worker as a test
+ tryEcho(futureStub);
+ workers.put(workerHostname, futureStub);
+ }
+
+ private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
+ throws ExecutionException, InterruptedException {
+ EchoRequest request =
EchoRequest.newBuilder().setMessage("hello").build();
+ LOG.info("echo to searcher: " + request.getMessage());
+ ListenableFuture<EchoResponse> response = stub.echo(request);
+ try {
+ LOG.info("echo from searcher: " + response.get().getMessage());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("failed to echo: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Execute search by firing RPC call to worker, return the result rows
+ */
+ public CarbonRow[] search(CarbonTable table, String[] columns,
Expression filter)
+ throws IOException, InvalidConfigurationException,
ExecutionException, InterruptedException {
+ Objects.requireNonNull(table);
+ Objects.requireNonNull(columns);
+
+ if (workers.size() == 0) {
+ throw new IOException("No searcher is available");
+ }
+
+ int queryId = random.nextInt();
+
+ // Build a SearchRequest
+ SearchRequest.Builder builder = SearchRequest.newBuilder()
+ .setQueryId(queryId)
+ .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
+ for (String column : columns) {
+ builder.addProjectColumns(column);
+ }
+ if (filter != null) {
+ builder.setFilterExpression(GrpcSerdes.serialize(filter));
+ }
+
+ // prune data and get a mapping of worker hostname to list of blocks,
+ // add these blocks to the SearchRequest and fire the RPC call
+ Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table,
columns, filter);
+
+ List<ListenableFuture<SearchResult>> futures = new
ArrayList<>(nodeBlockMapping.size());
+
+ for (Map.Entry<String, List<Distributable>> entry :
nodeBlockMapping.entrySet()) {
+ String hostname = entry.getKey();
+ List<Distributable> blocks = entry.getValue();
+ CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks,
hostname);
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(stream);
+ mbSplit.write(dataOutput);
+ builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
+
+ SearchRequest request = builder.build();
+
+ // do RPC to worker asynchronously and concurrently
+ ListenableFuture<SearchResult> future =
workers.get(hostname).search(request);
+ futures.add(future);
+ }
+
+ // get all results from RPC response and return to caller
+ List<CarbonRow> output = new LinkedList<>();
+ for (ListenableFuture<SearchResult> future : futures) {
+ SearchResult result = future.get();
+ if (result.getQueryId() != queryId) {
+ throw new IOException(String.format(
+ "queryId in response does not match request: %d != %d",
result.getQueryId(), queryId));
+ }
+ collectResult(result, output);
+ }
+ return output.toArray(new CarbonRow[output.size()]);
+ }
+
+ /**
+ * Prune data by using CarbonInputFormat.getSplit
+ * Return a mapping of hostname to list of block
+ */
+ private Map<String, List<Distributable>> pruneBlock(CarbonTable table,
String[] columns,
+ Expression filter) throws IOException, InvalidConfigurationException
{
+ JobConf jobConf = new JobConf(new Configuration());
+ Job job = new Job(jobConf);
+ CarbonTableInputFormat<Object> format =
CarbonInputFormatUtil.createCarbonTableInputFormat(
+ job, table, columns, filter, null, null);
+
+ List<InputSplit> splits = format.getSplits(job);
+ List<Distributable> distributables = new ArrayList<>(splits.size());
+ for (InputSplit split : splits) {
+ distributables.add(((CarbonInputSplit)split));
+ }
+ return CarbonLoaderUtil.nodeBlockMapping(
+ distributables, -1, new ArrayList<String>(workers.keySet()),
+ CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+ }
+
+ /**
+ * Fill result row to {@param output}
+ */
+ private void collectResult(SearchResult result, List<CarbonRow> output)
throws IOException {
+ for (ByteString bytes : result.getRowList()) {
+ CarbonRow row = GrpcSerdes.deserialize(bytes);
+ output.add(row);
+ }
+ }
+
+ /** return hostname of all workers */
+ public Set<String> getWorkers() {
+ return workers.keySet();
+ }
+
+ public static void main(String[] args) throws IOException,
InterruptedException {
--- End diff --
what is the use of main here? is it for testing ?
---