Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2440#discussion_r200026708
  
    --- Diff: 
store/core/src/main/java/org/apache/carbondata/store/master/Master.java ---
    @@ -0,0 +1,530 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.store.master;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.lang.ref.SoftReference;
    +import java.net.BindException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.block.Distributable;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
    +import org.apache.carbondata.core.exception.InvalidConfigurationException;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.locks.CarbonLockUtil;
    +import org.apache.carbondata.core.locks.ICarbonLock;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.SegmentFileStore;
    +import org.apache.carbondata.core.metadata.converter.SchemaConverter;
    +import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
    +import org.apache.carbondata.core.statusmanager.SegmentStatus;
    +import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.core.writer.ThriftWriter;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.hadoop.api.CarbonInputFormat;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +import org.apache.carbondata.processing.util.CarbonLoaderUtil;
    +import org.apache.carbondata.store.conf.StoreConf;
    +import org.apache.carbondata.store.exception.ExecutionTimeoutException;
    +import org.apache.carbondata.store.exception.StoreException;
    +import org.apache.carbondata.store.rest.controller.Horizon;
    +import org.apache.carbondata.store.rpc.RegistryService;
    +import org.apache.carbondata.store.rpc.ServiceFactory;
    +import org.apache.carbondata.store.rpc.StoreService;
    +import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl;
    +import org.apache.carbondata.store.rpc.impl.Status;
    +import org.apache.carbondata.store.rpc.model.BaseResponse;
    +import org.apache.carbondata.store.rpc.model.LoadDataRequest;
    +import org.apache.carbondata.store.rpc.model.QueryRequest;
    +import org.apache.carbondata.store.rpc.model.QueryResponse;
    +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
    +import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
    +import org.apache.carbondata.store.rpc.model.ShutdownRequest;
    +import org.apache.carbondata.store.scheduler.Schedulable;
    +import org.apache.carbondata.store.scheduler.Scheduler;
    +import org.apache.carbondata.store.util.StoreUtil;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.ipc.RPC;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +/**
    + * Master of CarbonSearch.
    + * It provides a Registry service for worker to register.
    + * And it provides search API to fire RPC call to workers.
    + */
    +
    +public class Master {
    +
    +  private static Master instance = null;
    +
    +  private static LogService LOGGER = 
LogServiceFactory.getLogService(Master.class.getName());
    +
    +  private Map<String, SoftReference<CarbonTable>> cacheTables;
    +
    +  // worker host address map to EndpointRef
    +  private StoreConf conf;
    +  private Configuration hadoopConf;
    +  private Random random = new Random();
    +  private RPC.Server registryServer = null;
    +  private Scheduler scheduler = new Scheduler();
    +
    +  private Master(StoreConf conf) {
    +    cacheTables = new HashMap<>();
    +    this.conf = conf;
    +    this.hadoopConf = this.conf.newHadoopConf();
    +  }
    +
    +  public void start() {
    +    try {
    +      startService();
    +    } catch (IOException e) {
    +      LOGGER.error(e, "master failed to start");
    +    }
    +  }
    +
    +  /**
    +   * start service and listen on port passed in constructor
    +   */
    +  public void startService() throws IOException {
    +    if (registryServer == null) {
    +
    +      BindException exception;
    +      // we will try to create service at worse case 100 times
    +      int numTry = 100;
    +      String host = conf.masterHost();
    +      int port = conf.masterPort();
    +      LOGGER.info("building registry-service on " + host + ":" + port);
    +
    +      RegistryService registryService = new RegistryServiceImpl(this);
    +      do {
    +        try {
    +          registryServer = new 
RPC.Builder(hadoopConf).setBindAddress(host).setPort(port)
    +              
.setProtocol(RegistryService.class).setInstance(registryService).build();
    +
    +          registryServer.start();
    +          numTry = 0;
    +          exception = null;
    +        } catch (BindException e) {
    +          // port is occupied, increase the port number and try again
    +          exception = e;
    +          LOGGER.error(e, "start registry-service failed");
    +          port = port + 1;
    +          numTry = numTry - 1;
    +        }
    +      } while (numTry > 0);
    +      if (exception != null) {
    +        // we have tried many times, but still failed to find an available 
port
    +        throw exception;
    +      }
    +      LOGGER.info("registry-service started");
    +    } else {
    +      LOGGER.info("Search mode master has already started");
    +    }
    +  }
    +
    +  public void stopService() throws InterruptedException {
    +    if (registryServer != null) {
    +      registryServer.stop();
    +      registryServer.join();
    +      registryServer = null;
    +    }
    +  }
    +
    +  public void stopAllWorkers() throws IOException {
    +    for (Schedulable worker : getWorkers()) {
    +      try {
    +        worker.service.shutdown(new ShutdownRequest("user"));
    +      } catch (Throwable throwable) {
    +        throw new IOException(throwable);
    +      }
    +      scheduler.removeWorker(worker.getAddress());
    +    }
    +  }
    +
    +  /**
    +   * A new searcher is trying to register, add it to the map and connect 
to this searcher
    +   */
    +  public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) 
throws IOException {
    +    LOGGER.info(
    +        "Receive Register request from worker " + request.getHostAddress() 
+ ":" + request.getPort()
    +            + " with " + request.getCores() + " cores");
    +    String workerId = UUID.randomUUID().toString();
    +    String workerAddress = request.getHostAddress();
    +    int workerPort = request.getPort();
    +    LOGGER.info(
    +        "connecting to worker " + request.getHostAddress() + ":" + 
request.getPort() + ", workerId "
    +            + workerId);
    +
    +    StoreService searchService = 
ServiceFactory.createStoreService(workerAddress, workerPort);
    +    scheduler.addWorker(
    +        new Schedulable(workerId, workerAddress, workerPort, 
request.getCores(), searchService));
    +    LOGGER.info("worker " + request + " registered");
    +    return new RegisterWorkerResponse(workerId);
    +  }
    +
    +  private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> 
output, long globalLimit)
    +      throws IOException {
    +    // in case of RPC success, collect all rows in response message
    +    if (result.getQueryId() != queryId) {
    +      throw new IOException(
    +          "queryId in response does not match request: " + 
result.getQueryId() + " != " + queryId);
    +    }
    +    if (result.getStatus() != Status.SUCCESS.ordinal()) {
    +      throw new IOException("failure in worker: " + result.getMessage());
    +    }
    +    int rowCount = 0;
    +    Object[][] rows = result.getRows();
    +    for (Object[] row : rows) {
    +      output.add(new CarbonRow(row));
    +      rowCount++;
    +      if (rowCount >= globalLimit) {
    +        break;
    +      }
    +    }
    +    LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + 
rowCount);
    +    return rowCount;
    +  }
    +
    +  private void onFailure(Throwable e) throws IOException {
    +    throw new IOException("exception in worker: " + e.getMessage());
    +  }
    +
    +  private void onTimeout() {
    +    throw new ExecutionTimeoutException();
    +  }
    +
    +  public String getTableFolder(String database, String tableName) {
    +    return conf.storeLocation() + File.separator + database + 
File.separator + tableName;
    +  }
    +
    +  public CarbonTable getTable(String database, String tableName) throws 
StoreException {
    +    String tablePath = getTableFolder(database, tableName);
    +    CarbonTable carbonTable;
    +    SoftReference<CarbonTable> reference = cacheTables.get(tablePath);
    +    if (reference != null) {
    +      carbonTable = reference.get();
    +      if (carbonTable != null) {
    +        return carbonTable;
    +      }
    +    }
    +
    +    try {
    +      org.apache.carbondata.format.TableInfo tableInfo =
    +          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
    +      SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
    +      TableInfo tableInfo1 = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
    +      tableInfo1.setTablePath(tablePath);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo1);
    +      cacheTables.put(tablePath, new 
SoftReference<CarbonTable>(carbonTable));
    +      return carbonTable;
    +    } catch (IOException e) {
    +      String message = "Failed to get table from " + tablePath;
    +      LOGGER.error(e, message);
    +      throw new StoreException(message);
    +    }
    +  }
    +
    +  public boolean createTable(TableInfo tableInfo, boolean ifNotExists) 
throws IOException {
    +    AbsoluteTableIdentifier identifier = 
tableInfo.getOrCreateAbsoluteTableIdentifier();
    +    boolean tableExists = 
FileFactory.isFileExist(identifier.getTablePath());
    +    if (tableExists) {
    +      if (ifNotExists) {
    +        return true;
    +      } else {
    +        throw new IOException(
    +            "car't create table " + tableInfo.getDatabaseName() + "." + 
tableInfo.getFactTable()
    +                .getTableName() + ", because it already exists");
    +      }
    +    }
    +
    +    SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
    +    String databaseName = tableInfo.getDatabaseName();
    +    String tableName = tableInfo.getFactTable().getTableName();
    +    org.apache.carbondata.format.TableInfo thriftTableInfo =
    +        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, 
databaseName, tableName);
    +
    +    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
    +    String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
    +    FileFactory.FileType fileType = 
FileFactory.getFileType(schemaMetadataPath);
    +    try {
    +      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
    +        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, 
fileType);
    +        if (!isDirCreated) {
    +          throw new IOException("Failed to create the metadata directory " 
+ schemaMetadataPath);
    +        }
    +      }
    +      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
    +      thriftWriter.open(FileWriteOperation.OVERWRITE);
    +      thriftWriter.write(thriftTableInfo);
    +      thriftWriter.close();
    +      return true;
    +    } catch (IOException e) {
    +      LOGGER.error(e, "Failed to handle create table");
    +      throw e;
    +    }
    +  }
    +
    +  public void openSegment(CarbonLoadModel loadModel, boolean 
isOverwriteTable) throws IOException {
    --- End diff --
    
    Please make the visibility minimum, as of now, I think it is better to 
provide createTable, loadData, etc only in Master


---

Reply via email to