Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2440#discussion_r200026708
--- Diff:
store/core/src/main/java/org/apache/carbondata/store/master/Master.java ---
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.master;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+import org.apache.carbondata.store.conf.StoreConf;
+import org.apache.carbondata.store.exception.ExecutionTimeoutException;
+import org.apache.carbondata.store.exception.StoreException;
+import org.apache.carbondata.store.rest.controller.Horizon;
+import org.apache.carbondata.store.rpc.RegistryService;
+import org.apache.carbondata.store.rpc.ServiceFactory;
+import org.apache.carbondata.store.rpc.StoreService;
+import org.apache.carbondata.store.rpc.impl.RegistryServiceImpl;
+import org.apache.carbondata.store.rpc.impl.Status;
+import org.apache.carbondata.store.rpc.model.BaseResponse;
+import org.apache.carbondata.store.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.rpc.model.QueryRequest;
+import org.apache.carbondata.store.rpc.model.QueryResponse;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.scheduler.Schedulable;
+import org.apache.carbondata.store.scheduler.Scheduler;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Master of CarbonSearch.
+ * It provides a Registry service for worker to register.
+ * And it provides search API to fire RPC call to workers.
+ */
+
+public class Master {
+
+ private static Master instance = null;
+
+ private static LogService LOGGER =
LogServiceFactory.getLogService(Master.class.getName());
+
+ private Map<String, SoftReference<CarbonTable>> cacheTables;
+
+ // worker host address map to EndpointRef
+ private StoreConf conf;
+ private Configuration hadoopConf;
+ private Random random = new Random();
+ private RPC.Server registryServer = null;
+ private Scheduler scheduler = new Scheduler();
+
+ private Master(StoreConf conf) {
+ cacheTables = new HashMap<>();
+ this.conf = conf;
+ this.hadoopConf = this.conf.newHadoopConf();
+ }
+
+ public void start() {
+ try {
+ startService();
+ } catch (IOException e) {
+ LOGGER.error(e, "master failed to start");
+ }
+ }
+
+ /**
+ * start service and listen on port passed in constructor
+ */
+ public void startService() throws IOException {
+ if (registryServer == null) {
+
+ BindException exception;
+ // we will try to create service at worse case 100 times
+ int numTry = 100;
+ String host = conf.masterHost();
+ int port = conf.masterPort();
+ LOGGER.info("building registry-service on " + host + ":" + port);
+
+ RegistryService registryService = new RegistryServiceImpl(this);
+ do {
+ try {
+ registryServer = new
RPC.Builder(hadoopConf).setBindAddress(host).setPort(port)
+
.setProtocol(RegistryService.class).setInstance(registryService).build();
+
+ registryServer.start();
+ numTry = 0;
+ exception = null;
+ } catch (BindException e) {
+ // port is occupied, increase the port number and try again
+ exception = e;
+ LOGGER.error(e, "start registry-service failed");
+ port = port + 1;
+ numTry = numTry - 1;
+ }
+ } while (numTry > 0);
+ if (exception != null) {
+ // we have tried many times, but still failed to find an available
port
+ throw exception;
+ }
+ LOGGER.info("registry-service started");
+ } else {
+ LOGGER.info("Search mode master has already started");
+ }
+ }
+
+ public void stopService() throws InterruptedException {
+ if (registryServer != null) {
+ registryServer.stop();
+ registryServer.join();
+ registryServer = null;
+ }
+ }
+
+ public void stopAllWorkers() throws IOException {
+ for (Schedulable worker : getWorkers()) {
+ try {
+ worker.service.shutdown(new ShutdownRequest("user"));
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ scheduler.removeWorker(worker.getAddress());
+ }
+ }
+
+ /**
+ * A new searcher is trying to register, add it to the map and connect
to this searcher
+ */
+ public RegisterWorkerResponse addWorker(RegisterWorkerRequest request)
throws IOException {
+ LOGGER.info(
+ "Receive Register request from worker " + request.getHostAddress()
+ ":" + request.getPort()
+ + " with " + request.getCores() + " cores");
+ String workerId = UUID.randomUUID().toString();
+ String workerAddress = request.getHostAddress();
+ int workerPort = request.getPort();
+ LOGGER.info(
+ "connecting to worker " + request.getHostAddress() + ":" +
request.getPort() + ", workerId "
+ + workerId);
+
+ StoreService searchService =
ServiceFactory.createStoreService(workerAddress, workerPort);
+ scheduler.addWorker(
+ new Schedulable(workerId, workerAddress, workerPort,
request.getCores(), searchService));
+ LOGGER.info("worker " + request + " registered");
+ return new RegisterWorkerResponse(workerId);
+ }
+
+ private int onSuccess(int queryId, QueryResponse result, List<CarbonRow>
output, long globalLimit)
+ throws IOException {
+ // in case of RPC success, collect all rows in response message
+ if (result.getQueryId() != queryId) {
+ throw new IOException(
+ "queryId in response does not match request: " +
result.getQueryId() + " != " + queryId);
+ }
+ if (result.getStatus() != Status.SUCCESS.ordinal()) {
+ throw new IOException("failure in worker: " + result.getMessage());
+ }
+ int rowCount = 0;
+ Object[][] rows = result.getRows();
+ for (Object[] row : rows) {
+ output.add(new CarbonRow(row));
+ rowCount++;
+ if (rowCount >= globalLimit) {
+ break;
+ }
+ }
+ LOGGER.info("[QueryId:" + queryId + "] accumulated result size " +
rowCount);
+ return rowCount;
+ }
+
+ private void onFailure(Throwable e) throws IOException {
+ throw new IOException("exception in worker: " + e.getMessage());
+ }
+
+ private void onTimeout() {
+ throw new ExecutionTimeoutException();
+ }
+
+ public String getTableFolder(String database, String tableName) {
+ return conf.storeLocation() + File.separator + database +
File.separator + tableName;
+ }
+
+ public CarbonTable getTable(String database, String tableName) throws
StoreException {
+ String tablePath = getTableFolder(database, tableName);
+ CarbonTable carbonTable;
+ SoftReference<CarbonTable> reference = cacheTables.get(tablePath);
+ if (reference != null) {
+ carbonTable = reference.get();
+ if (carbonTable != null) {
+ return carbonTable;
+ }
+ }
+
+ try {
+ org.apache.carbondata.format.TableInfo tableInfo =
+
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+ SchemaConverter schemaConverter = new
ThriftWrapperSchemaConverterImpl();
+ TableInfo tableInfo1 =
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
+ tableInfo1.setTablePath(tablePath);
+ carbonTable = CarbonTable.buildFromTableInfo(tableInfo1);
+ cacheTables.put(tablePath, new
SoftReference<CarbonTable>(carbonTable));
+ return carbonTable;
+ } catch (IOException e) {
+ String message = "Failed to get table from " + tablePath;
+ LOGGER.error(e, message);
+ throw new StoreException(message);
+ }
+ }
+
+ public boolean createTable(TableInfo tableInfo, boolean ifNotExists)
throws IOException {
+ AbsoluteTableIdentifier identifier =
tableInfo.getOrCreateAbsoluteTableIdentifier();
+ boolean tableExists =
FileFactory.isFileExist(identifier.getTablePath());
+ if (tableExists) {
+ if (ifNotExists) {
+ return true;
+ } else {
+ throw new IOException(
+ "car't create table " + tableInfo.getDatabaseName() + "." +
tableInfo.getFactTable()
+ .getTableName() + ", because it already exists");
+ }
+ }
+
+ SchemaConverter schemaConverter = new
ThriftWrapperSchemaConverterImpl();
+ String databaseName = tableInfo.getDatabaseName();
+ String tableName = tableInfo.getFactTable().getTableName();
+ org.apache.carbondata.format.TableInfo thriftTableInfo =
+ schemaConverter.fromWrapperToExternalTableInfo(tableInfo,
databaseName, tableName);
+
+ String schemaFilePath =
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+ String schemaMetadataPath =
CarbonTablePath.getFolderContainingFile(schemaFilePath);
+ FileFactory.FileType fileType =
FileFactory.getFileType(schemaMetadataPath);
+ try {
+ if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+ boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath,
fileType);
+ if (!isDirCreated) {
+ throw new IOException("Failed to create the metadata directory "
+ schemaMetadataPath);
+ }
+ }
+ ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+ thriftWriter.open(FileWriteOperation.OVERWRITE);
+ thriftWriter.write(thriftTableInfo);
+ thriftWriter.close();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error(e, "Failed to handle create table");
+ throw e;
+ }
+ }
+
+ public void openSegment(CarbonLoadModel loadModel, boolean
isOverwriteTable) throws IOException {
--- End diff --
Please make the visibility minimum, as of now, I think it is better to
provide createTable, loadData, etc only in Master
---