Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2148#discussion_r181632923
  
    --- Diff: 
store/search/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutput;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.protocol.EchoRequest;
    +import org.apache.carbondata.store.protocol.EchoResponse;
    +import org.apache.carbondata.store.protocol.SearchRequest;
    +import org.apache.carbondata.store.protocol.SearchResult;
    +import org.apache.carbondata.store.protocol.ShutdownRequest;
    +import org.apache.carbondata.store.protocol.ShutdownResponse;
    +import org.apache.carbondata.store.protocol.WorkerGrpc;
    +import org.apache.carbondata.store.util.GrpcSerdes;
    +
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.protobuf.ByteString;
    +import io.grpc.ManagedChannel;
    +import io.grpc.ManagedChannelBuilder;
    +import io.grpc.Server;
    +import io.grpc.ServerBuilder;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It listens to {@link Master#DEFAULT_PORT} to wait for worker to 
register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +@InterfaceAudience.Internal
    +public class Master {
    +
    +  private static final LogService LOG = 
LogServiceFactory.getLogService(Master.class.getName());
    +
    +  public static final int DEFAULT_PORT = 10020;
    +
    +  private Server registryServer;
    +
    +  private int port;
    +
    +  private Random random = new Random();
    +
    +  /** mapping of worker hostname to rpc stub */
    +  private Map<String, WorkerGrpc.WorkerFutureStub> workers;
    +
    +  public Master() {
    +    this(DEFAULT_PORT);
    +  }
    +
    +  public Master(int port) {
    +    this.port = port;
    +    this.workers = new ConcurrentHashMap<>();
    +  }
    +
    +  /** start service and listen on port passed in constructor */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +      /* The port on which the registryServer should run */
    +      registryServer = ServerBuilder.forPort(port)
    +          .addService(new RegistryService(this))
    +          .build()
    +          .start();
    +      LOG.info("Master started, listening on " + port);
    +      Runtime.getRuntime().addShutdownHook(new Thread() {
    +        @Override public void run() {
    +          // Use stderr here since the logger may have been reset by its 
JVM shutdown hook.
    +          LOG.info("*** shutting down gRPC Master since JVM is shutting 
down");
    +          stopService();
    +          LOG.info("*** Master shut down");
    +        }
    +      });
    +    }
    +  }
    +
    +  public void stopService() {
    +    if (registryServer != null) {
    +      registryServer.shutdown();
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException, ExecutionException, 
InterruptedException {
    +    ShutdownRequest request = ShutdownRequest.newBuilder()
    +        .setTrigger(ShutdownRequest.Trigger.USER)
    +        .build();
    +    for (Map.Entry<String, WorkerGrpc.WorkerFutureStub> worker : 
workers.entrySet()) {
    +      ListenableFuture<ShutdownResponse> future = 
worker.getValue().shutdown(request);
    +      ShutdownResponse response = future.get();
    +      if (response.getStatus() != ShutdownResponse.Status.SUCCESS) {
    +        LOG.error("failed to shutdown worker: " + response.getMessage());
    +        throw new IOException(response.getMessage());
    +      } else {
    +        workers.remove(worker.getKey());
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Await termination on the main thread since the grpc library uses 
daemon threads.
    +   */
    +  private void blockUntilShutdown() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.awaitTermination();
    +    }
    +  }
    +
    +  /** A new searcher is trying to register, add it to the map and connect 
to this searcher */
    +  void addWorker(String workerHostname, int port, int cores)
    +      throws ExecutionException, InterruptedException {
    +    Objects.requireNonNull(workerHostname);
    +
    +    LOG.info("trying to connect to searcher " + workerHostname + ":" + 
port);
    +    ManagedChannel channelToWorker = 
ManagedChannelBuilder.forAddress(workerHostname, port)
    +        .usePlaintext(true)
    +        .maxInboundMessageSize(200 * 1000 * 1000)
    +        .build();
    +    WorkerGrpc.WorkerFutureStub futureStub = 
WorkerGrpc.newFutureStub(channelToWorker);
    +
    +    // try to send a message to worker as a test
    +    tryEcho(futureStub);
    +    workers.put(workerHostname, futureStub);
    +  }
    +
    +  private void tryEcho(WorkerGrpc.WorkerFutureStub stub)
    +      throws ExecutionException, InterruptedException {
    +    EchoRequest request = 
EchoRequest.newBuilder().setMessage("hello").build();
    +    LOG.info("echo to searcher: " + request.getMessage());
    +    ListenableFuture<EchoResponse> response = stub.echo(request);
    +    try {
    +      LOG.info("echo from searcher: " + response.get().getMessage());
    +    } catch (InterruptedException | ExecutionException e) {
    +      LOG.error("failed to echo: " + e.getMessage());
    +      throw e;
    +    }
    +  }
    +
    +  /**
    +   * Execute search by firing RPC call to worker, return the result rows
    +   */
    +  public CarbonRow[] search(CarbonTable table, String[] columns, 
Expression filter)
    +      throws IOException, InvalidConfigurationException, 
ExecutionException, InterruptedException {
    +    Objects.requireNonNull(table);
    +    Objects.requireNonNull(columns);
    +
    +    if (workers.size() == 0) {
    +      throw new IOException("No searcher is available");
    +    }
    +
    +    int queryId = random.nextInt();
    +
    +    // Build a SearchRequest
    +    SearchRequest.Builder builder = SearchRequest.newBuilder()
    +        .setQueryId(queryId)
    +        .setTableInfo(GrpcSerdes.serialize(table.getTableInfo()));
    +    for (String column : columns) {
    +      builder.addProjectColumns(column);
    +    }
    +    if (filter != null) {
    +      builder.setFilterExpression(GrpcSerdes.serialize(filter));
    +    }
    +
    +    // prune data and get a mapping of worker hostname to list of blocks,
    +    // add these blocks to the SearchRequest and fire the RPC call
    +    Map<String, List<Distributable>> nodeBlockMapping = pruneBlock(table, 
columns, filter);
    +
    +    List<ListenableFuture<SearchResult>> futures = new 
ArrayList<>(nodeBlockMapping.size());
    +
    +    for (Map.Entry<String, List<Distributable>> entry : 
nodeBlockMapping.entrySet()) {
    +      String hostname = entry.getKey();
    +      List<Distributable> blocks = entry.getValue();
    +      CarbonMultiBlockSplit mbSplit = new CarbonMultiBlockSplit(blocks, 
hostname);
    +      ByteArrayOutputStream stream = new ByteArrayOutputStream();
    +      DataOutput dataOutput = new DataOutputStream(stream);
    +      mbSplit.write(dataOutput);
    +      builder.setSplits(ByteString.copyFrom(stream.toByteArray()));
    +
    +      SearchRequest request = builder.build();
    +
    +      // do RPC to worker asynchronously and concurrently
    +      ListenableFuture<SearchResult> future = 
workers.get(hostname).search(request);
    +      futures.add(future);
    +    }
    +
    +    // get all results from RPC response and return to caller
    +    List<CarbonRow> output = new LinkedList<>();
    +    for (ListenableFuture<SearchResult> future : futures) {
    +      SearchResult result = future.get();
    +      if (result.getQueryId() != queryId) {
    +        throw new IOException(String.format(
    +            "queryId in response does not match request: %d != %d", 
result.getQueryId(), queryId));
    +      }
    +      collectResult(result, output);
    +    }
    +    return output.toArray(new CarbonRow[output.size()]);
    +  }
    +
    +  /**
    +   * Prune data by using CarbonInputFormat.getSplit
    +   * Return a mapping of hostname to list of block
    +   */
    +  private Map<String, List<Distributable>> pruneBlock(CarbonTable table, 
String[] columns,
    +      Expression filter) throws IOException, InvalidConfigurationException 
{
    +    JobConf jobConf = new JobConf(new Configuration());
    +    Job job = new Job(jobConf);
    +    CarbonTableInputFormat<Object> format = 
CarbonInputFormatUtil.createCarbonTableInputFormat(
    +        job, table, columns, filter, null, null);
    +
    +    List<InputSplit> splits = format.getSplits(job);
    +    List<Distributable> distributables = new ArrayList<>(splits.size());
    +    for (InputSplit split : splits) {
    +      distributables.add(((CarbonInputSplit)split));
    +    }
    +    return CarbonLoaderUtil.nodeBlockMapping(
    +        distributables, -1, new ArrayList<String>(workers.keySet()),
    +        CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
    +  }
    +
    +  /**
    +   * Fill result row to {@param output}
    +   */
    +  private void collectResult(SearchResult result,  List<CarbonRow> output) 
throws IOException {
    +    for (ByteString bytes : result.getRowList()) {
    +      CarbonRow row = GrpcSerdes.deserialize(bytes);
    +      output.add(row);
    +    }
    +  }
    +
    +  /** return hostname of all workers */
    +  public Set<String> getWorkers() {
    +    return workers.keySet();
    +  }
    +
    +  public static void main(String[] args) throws IOException, 
InterruptedException {
    --- End diff --
    
    yes


---

Reply via email to