http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java 
b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
deleted file mode 100644
index da2a697..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.conf;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-public class StoreConf implements Serializable, Writable {
-
-  public static final String SELECT_PROJECTION = "carbon.select.projection";
-  public static final String SELECT_FILTER = "carbon.select.filter";
-  public static final String SELECT_LIMIT = "carbon.select.limit";
-
-  public static final String SELECT_ID = "carbon.select.id";
-
-  public static final String WORKER_HOST = "carbon.worker.host";
-  public static final String WORKER_PORT = "carbon.worker.port";
-  public static final String WORKER_CORE_NUM = "carbon.worker.core.num";
-  public static final String MASTER_HOST = "carbon.master.host";
-  public static final String MASTER_PORT = "carbon.master.port";
-
-  public static final String STORE_TEMP_LOCATION = 
"carbon.store.temp.location";
-  public static final String STORE_LOCATION = "carbon.store.location";
-
-  private Map<String, String> conf = new HashMap<>();
-
-  public StoreConf() {
-  }
-
-  public StoreConf(String filePath) {
-    load(filePath);
-  }
-
-  public StoreConf conf(String key, String value) {
-    conf.put(key, value);
-    return this;
-  }
-
-  public StoreConf conf(String key, int value) {
-    conf.put(key, "" + value);
-    return this;
-  }
-
-  public void load(String filePath) {
-    StoreUtil.loadProperties(filePath, this);
-  }
-
-  public void conf(StoreConf conf) {
-    this.conf.putAll(conf.conf);
-  }
-
-  public Object conf(String key) {
-    return conf.get(key);
-  }
-
-  public String[] projection() {
-    return stringArrayValue(SELECT_PROJECTION);
-  }
-
-  public String filter() {
-    return stringValue(SELECT_FILTER);
-  }
-
-  public int limit() {
-    return intValue(SELECT_LIMIT);
-  }
-
-  public String masterHost() {
-    return stringValue(MASTER_HOST);
-  }
-
-  public int masterPort() {
-    return intValue(MASTER_PORT);
-  }
-
-  public String workerHost() {
-    return stringValue(WORKER_HOST);
-  }
-
-  public int workerPort() {
-    return intValue(WORKER_PORT);
-  }
-
-  public int workerCoreNum() {
-    return intValue(WORKER_CORE_NUM);
-  }
-
-  public String storeLocation() {
-    return stringValue(STORE_LOCATION);
-  }
-
-  public String[] storeTempLocation() {
-    return stringArrayValue(STORE_TEMP_LOCATION);
-  }
-
-  public String selectId() {
-    return stringValue(SELECT_ID);
-  }
-
-  public Configuration newHadoopConf() {
-    Configuration hadoopConf = FileFactory.getConfiguration();
-    for (Map.Entry<String, String> entry : conf.entrySet()) {
-      String key = entry.getKey();
-      String value = entry.getValue();
-      if (key != null && value != null && key.startsWith("carbon.hadoop.")) {
-        hadoopConf.set(key.substring("carbon.hadoop.".length()), value);
-      }
-    }
-    return hadoopConf;
-  }
-
-  private String stringValue(String key) {
-    Object obj = conf.get(key);
-    if (obj == null) {
-      return null;
-    }
-    return obj.toString();
-  }
-
-  private int intValue(String key) {
-    String value = conf.get(key);
-    if (value == null) {
-      return -1;
-    }
-    return Integer.parseInt(value);
-  }
-
-  private String[] stringArrayValue(String key) {
-    String value = conf.get(key);
-    if (value == null) {
-      return null;
-    }
-    return value.split(",", -1);
-  }
-
-  @Override public void write(DataOutput out) throws IOException {
-    Set<Map.Entry<String, String>> entries = conf.entrySet();
-    WritableUtils.writeVInt(out, conf.size());
-    for (Map.Entry<String, String> entry : entries) {
-      WritableUtils.writeString(out, entry.getKey());
-      WritableUtils.writeString(out, entry.getValue());
-    }
-  }
-
-  @Override public void readFields(DataInput in) throws IOException {
-    if (conf == null) {
-      conf = new HashMap<>();
-    }
-
-    int size = WritableUtils.readVInt(in);
-    String key, value;
-    for (int i = 0; i < size; i++) {
-      key = WritableUtils.readString(in);
-      value = WritableUtils.readString(in);
-      conf.put(key, value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
deleted file mode 100644
index c7a4d6b..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.exception;
-
-public class ExecutionTimeoutException extends RuntimeException {
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
deleted file mode 100644
index c55fa7c..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.exception;
-
-public class StoreException extends Exception {
-
-  public StoreException() {
-    super();
-  }
-
-  public StoreException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
 
b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
deleted file mode 100644
index b366a67..0000000
--- 
a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.exception;
-
-public class WorkerTooBusyException extends RuntimeException {
-
-  public WorkerTooBusyException(String message) {
-    super(message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
new file mode 100644
index 0000000..7e50102
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.store.api.CarbonStore;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Provides base functionality of CarbonStore, it contains basic 
implementation of metadata
+ * management, data pruning and data scan logic.
+ */
[email protected]
+public abstract class CarbonStoreBase implements CarbonStore {
+
+  private static LogService LOGGER =
+      
LogServiceFactory.getLogService(CarbonStoreBase.class.getCanonicalName());
+
+  MetaProcessor metaProcessor;
+  private StoreConf storeConf;
+
+  CarbonStoreBase(StoreConf storeConf) {
+    this.storeConf = storeConf;
+    this.metaProcessor = new MetaProcessor(this);
+  }
+
+  @Override
+  public void createTable(TableDescriptor table) throws IOException, 
StoreException {
+    Objects.requireNonNull(table);
+    metaProcessor.createTable(table);
+  }
+
+  @Override
+  public void dropTable(TableIdentifier table) throws IOException {
+    Objects.requireNonNull(table);
+    metaProcessor.dropTable(table);
+  }
+
+  @Override
+  public CarbonTable getTable(TableIdentifier table) throws IOException {
+    Objects.requireNonNull(table);
+    return metaProcessor.getTable(table);
+  }
+
+  public String getTablePath(String tableName, String databaseName) {
+    Objects.requireNonNull(tableName);
+    Objects.requireNonNull(databaseName);
+    return String.format("%s/%s", storeConf.storeLocation(), tableName);
+  }
+
+  /**
+   * Prune data by using CarbonInputFormat.getSplit
+   * Return a mapping of host address to list of block.
+   * This should be invoked in driver side.
+   */
+  public static List<Distributable> pruneBlock(CarbonTable table, String[] 
columns,
+      Expression filter) throws IOException {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(columns);
+    JobConf jobConf = new JobConf(new Configuration());
+    Job job = new Job(jobConf);
+    CarbonTableInputFormat format;
+    try {
+      format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+          job, table, columns, filter, null, null, true);
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    // We will do FG pruning in reader side, so don't do it here
+    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
+    List<InputSplit> splits = format.getSplits(job);
+    List<Distributable> blockInfos = new ArrayList<>(splits.size());
+    for (InputSplit split : splits) {
+      blockInfos.add((Distributable) split);
+    }
+    return blockInfos;
+  }
+
+  /**
+   * Scan data and return matched rows. This should be invoked in worker side.
+   * @param table carbon table
+   * @param scan scan parameter
+   * @return matched rows
+   * @throws IOException if IO error occurs
+   */
+  public static List<CarbonRow> scan(CarbonTable table, Scan scan) throws 
IOException {
+    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+    carbonTaskInfo.setTaskId(System.nanoTime());
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
+
+    CarbonMultiBlockSplit mbSplit = scan.getSplit();
+    long limit = scan.getLimit();
+    QueryModel queryModel = createQueryModel(table, scan);
+
+    LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", 
scan.getRequestId(),
+        queryModel.toString(), mbSplit.getAllSplits().size()));
+
+    // read all rows by the reader
+    List<CarbonRow> rows = new LinkedList<>();
+    try (CarbonRecordReader<CarbonRow> reader = new 
IndexedRecordReader(scan.getRequestId(),
+        table, queryModel)) {
+      reader.initialize(mbSplit, null);
+
+      // loop to read required number of rows.
+      // By default, if user does not specify the limit value, limit is 
Long.MaxValue
+      long rowCount = 0;
+      while (reader.nextKeyValue() && rowCount < limit) {
+        rows.add(reader.getCurrentValue());
+        rowCount++;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
+        scan.getRequestId(), rows.size()));
+    return rows;
+  }
+
+  private static QueryModel createQueryModel(CarbonTable table, Scan scan) {
+    String[] projectColumns = scan.getProjectColumns();
+    Expression filter = null;
+    if (scan.getFilterExpression() != null) {
+      filter = scan.getFilterExpression();
+    }
+    return new QueryModelBuilder(table)
+        .projectColumns(projectColumns)
+        .filterExpression(filter)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
new file mode 100644
index 0000000..3667aea
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.exception.ExecutionTimeoutException;
+import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.impl.master.Schedulable;
+import org.apache.carbondata.store.impl.master.Scheduler;
+import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
+import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+
+/**
+ * A CarbonStore that leverage multiple servers via RPC calls (Master and 
Workers)
+ */
[email protected]
[email protected]
+class DistributedCarbonStore extends CarbonStoreBase {
+  private static LogService LOGGER =
+      
LogServiceFactory.getLogService(DistributedCarbonStore.class.getCanonicalName());
+  private SegmentTxnManager txnManager;
+  private Scheduler scheduler;
+  private Random random = new Random();
+
+  DistributedCarbonStore(StoreConf storeConf) throws IOException {
+    super(storeConf);
+    this.scheduler = new Scheduler(storeConf);
+    txnManager = SegmentTxnManager.getInstance();
+  }
+
+  @Override
+  public void loadData(LoadDescriptor load) throws IOException, StoreException 
{
+    Objects.requireNonNull(load);
+    CarbonTable table = metaProcessor.getTable(load.getTable());
+    CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
+    builder.setInputPath(load.getInputPath());
+    CarbonLoadModel loadModel;
+    try {
+      loadModel = builder.build(load.getOptions(), System.currentTimeMillis(), 
"0");
+    } catch (InvalidLoadOptionException e) {
+      LOGGER.error(e, "Invalid loadDescriptor options");
+      throw new StoreException(e);
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to loadDescriptor data");
+      throw e;
+    }
+
+    Schedulable worker = scheduler.pickNexWorker();
+    try {
+      if (loadModel.getFactTimeStamp() == 0) {
+        loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
+      }
+      txnManager.openSegment(loadModel, load.isOverwrite());
+      LoadDataRequest request = new LoadDataRequest(loadModel);
+      BaseResponse response = scheduler.sendRequest(worker, request);
+      if (Status.SUCCESS.ordinal() == response.getStatus()) {
+        txnManager.commitSegment(loadModel);
+      } else {
+        txnManager.closeSegment(loadModel);
+        throw new StoreException(response.getMessage());
+      }
+    } finally {
+      worker.workload.decrementAndGet();
+    }
+  }
+
+  @Override
+  public List<CarbonRow> select(SelectDescriptor select) throws IOException, 
StoreException {
+    Objects.requireNonNull(select);
+    CarbonTable carbonTable = metaProcessor.getTable(select.getTable());
+    return select(
+        carbonTable,
+        select.getProjection(),
+        select.getFilter(),
+        select.getLimit(),
+        select.getLimit());
+  }
+
+  /**
+   * Execute search by firing RPC call to worker, return the result rows
+   *
+   * @param table       table to search
+   * @param columns     projection column names
+   * @param filter      filter expression
+   * @param globalLimit max number of rows required in Master
+   * @param localLimit  max number of rows required in Worker
+   * @return CarbonRow
+   */
+  private List<CarbonRow> select(CarbonTable table, String[] columns, 
Expression filter,
+      long globalLimit, long localLimit) throws IOException {
+    Objects.requireNonNull(table);
+    Objects.requireNonNull(columns);
+    if (globalLimit < 0 || localLimit < 0) {
+      throw new IllegalArgumentException("limit should be positive");
+    }
+
+    int queryId = random.nextInt();
+
+    List<CarbonRow> output = new ArrayList<>();
+
+    // prune data and get a mapping of worker hostname to list of blocks,
+    // then add these blocks to the Scan and fire the RPC call
+    List<Distributable> blockInfos = pruneBlock(table, columns, filter);
+
+    Map<String, List<Distributable>> nodeBlockMapping =
+        CarbonLoaderUtil.nodeBlockMapping(
+            blockInfos, -1, scheduler.getAllWorkerAddresses(),
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
+
+    Set<Map.Entry<String, List<Distributable>>> entries = 
nodeBlockMapping.entrySet();
+    List<Future<QueryResponse>> futures = new ArrayList<>(entries.size());
+    List<Schedulable> workers = new ArrayList<>(entries.size());
+    for (Map.Entry<String, List<Distributable>> entry : entries) {
+      CarbonMultiBlockSplit split = new 
CarbonMultiBlockSplit(entry.getValue(), entry.getKey());
+      Scan scan =
+          new Scan(queryId, split, table.getTableInfo(), columns, filter, 
localLimit);
+
+      // Find an Endpoind and send the request to it
+      // This RPC is non-blocking so that we do not need to wait before send 
to next worker
+      Schedulable worker = scheduler.pickWorker(entry.getKey());
+      workers.add(worker);
+      futures.add(scheduler.sendRequestAsync(worker, scan));
+    }
+
+    int rowCount = 0;
+    int length = futures.size();
+    for (int i = 0; i < length; i++) {
+      Future<QueryResponse> future = futures.get(i);
+      Schedulable worker = workers.get(i);
+      if (rowCount < globalLimit) {
+        // wait for worker
+        QueryResponse response = null;
+        try {
+          response = future
+              .get((long) (CarbonProperties.getInstance().getQueryTimeout()), 
TimeUnit.SECONDS);
+        } catch (ExecutionException | InterruptedException e) {
+          throw new IOException("exception in worker: " + e.getMessage());
+        } catch (TimeoutException t) {
+          throw new ExecutionTimeoutException();
+        } finally {
+          worker.workload.decrementAndGet();
+        }
+        LOGGER.info("[QueryId: " + queryId + "] receive search response from 
worker " + worker);
+        rowCount += onSuccess(queryId, response, output, globalLimit);
+      }
+    }
+    return output;
+  }
+
+  private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> 
output, long globalLimit)
+      throws IOException {
+    // in case of RPC success, collect all rows in response message
+    if (result.getQueryId() != queryId) {
+      throw new IOException(
+          "queryId in response does not match request: " + result.getQueryId() 
+ " != " + queryId);
+    }
+    if (result.getStatus() != Status.SUCCESS.ordinal()) {
+      throw new IOException("failure in worker: " + result.getMessage());
+    }
+    int rowCount = 0;
+    Object[][] rows = result.getRows();
+    for (Object[] row : rows) {
+      output.add(new CarbonRow(row));
+      rowCount++;
+      if (rowCount >= globalLimit) {
+        break;
+      }
+    }
+    LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + 
rowCount);
+    return rowCount;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOGGER.info("Shutting down all workers...");
+    scheduler.stopAllWorkers();
+    LOGGER.info("All workers are shut down");
+    try {
+      LOGGER.info("Stopping master...");
+      scheduler.stopService();
+      LOGGER.info("Master stopped");
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
new file mode 100644
index 0000000..64f0742
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
+import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a special RecordReader that leverages FGDataMap before reading 
carbondata file
+ * and return CarbonRow object
+ */
+class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(IndexedRecordReader.class.getName());
+
+  private int queryId;
+  private CarbonTable table;
+
+  public IndexedRecordReader(int queryId, CarbonTable table, QueryModel 
queryModel) {
+    super(queryModel, new CarbonRowReadSupport());
+    this.queryId = queryId;
+    this.table = table;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit;
+    List<CarbonInputSplit> splits =  mbSplit.getAllSplits();
+    List<TableBlockInfo> list = 
CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
+    queryModel.setTableBlockInfos(list);
+
+    // prune the block with FGDataMap is there is one based on the filter 
condition
+    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
+        queryModel.getFilterExpressionResolverTree());
+    if (fgDataMap != null) {
+      queryModel = prune(table, queryModel, mbSplit, fgDataMap);
+    } else {
+      List<TableBlockInfo> tableBlockInfoList = 
CarbonInputSplit.createBlocks(splits);
+      queryModel.setTableBlockInfos(tableBlockInfoList);
+    }
+
+    readSupport.initialize(queryModel.getProjectionColumns(), 
queryModel.getTable());
+    try {
+      carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
+    } catch (QueryExecutionException e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+
+  private DataMapExprWrapper chooseFGDataMap(
+      CarbonTable table,
+      FilterResolverIntf filterInterface) {
+    DataMapChooser chooser = null;
+    try {
+      chooser = new DataMapChooser(table);
+      return chooser.chooseFGDataMap(filterInterface);
+    } catch (IOException e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  /**
+   * If there is FGDataMap defined for this table and filter condition in the 
query,
+   * prune the splits by the DataMap and set the pruned split into the 
QueryModel and return
+   */
+  private QueryModel prune(CarbonTable table, QueryModel queryModel,
+      CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws 
IOException {
+    Objects.requireNonNull(datamap);
+    List<Segment> segments = new LinkedList<>();
+    HashMap<String, Integer> uniqueSegments = new HashMap<>();
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(
+            CarbonTablePath.getMetadataPath(table.getTablePath()));
+    for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+      String segmentId = Segment.getSegment(split.getSegmentId(), 
loadMetadataDetails).toString();
+      if (uniqueSegments.get(segmentId) == null) {
+        segments.add(Segment.toSegment(segmentId,
+            new 
TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+                loadMetadataDetails)));
+        uniqueSegments.put(segmentId, 1);
+      } else {
+        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
+      }
+    }
+
+    List<DataMapDistributableWrapper> distributables = 
datamap.toDistributable(segments);
+    List<ExtendedBlocklet> prunnedBlocklets = new 
LinkedList<ExtendedBlocklet>();
+    for (int i = 0; i < distributables.size(); i++) {
+      DataMapDistributable dataMapDistributable = 
distributables.get(i).getDistributable();
+      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
+    }
+
+    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
+    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
+      pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
+    }
+
+    List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+    List<TableBlockInfo> blockToRead = new LinkedList<>();
+    for (TableBlockInfo block : blocks) {
+      if (pathToRead.keySet().contains(block.getFilePath())) {
+        // If not set this, it won't create FineGrainBlocklet object in
+        // 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
+        
block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
+        blockToRead.add(block);
+      }
+    }
+    LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned 
blocks: %d", queryId,
+        blockToRead.size()));
+    queryModel.setTableBlockInfos(blockToRead);
+    return queryModel;
+  }
+
+  @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = 
queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : 
columnToDictionaryMapping.entrySet()) {
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+
+    // close read support
+    readSupport.close();
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
new file mode 100644
index 0000000..40b6bdf
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import 
org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
+import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
+import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * A CarbonStore implementation that works locally, without other compute 
framework dependency.
+ * It can be used to read data in local disk.
+ */
[email protected]
[email protected]
+class LocalCarbonStore extends CarbonStoreBase {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
+
+  private StoreConf storeConf;
+  private Configuration configuration;
+  private SegmentTxnManager txnManager;
+
+  LocalCarbonStore(StoreConf storeConf) {
+    this(storeConf, new Configuration());
+  }
+
+  LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) {
+    super(storeConf);
+    this.storeConf = storeConf;
+    this.txnManager = SegmentTxnManager.getInstance();
+    this.configuration = new Configuration(hadoopConf);
+  }
+
+  @Override
+  public void loadData(LoadDescriptor load) throws IOException, StoreException 
{
+    Objects.requireNonNull(load);
+    CarbonTable table = metaProcessor.getTable(load.getTable());
+    CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
+    modelBuilder.setInputPath(load.getInputPath());
+    CarbonLoadModel loadModel;
+    try {
+      loadModel = modelBuilder.build(load.getOptions(), 
System.currentTimeMillis(), "0");
+    } catch (InvalidLoadOptionException e) {
+      LOGGER.error(e, "Invalid loadDescriptor options");
+      throw new StoreException(e.getMessage());
+    }
+
+    if (loadModel.getFactTimeStamp() == 0) {
+      loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
+    }
+
+    try {
+      txnManager.openSegment(loadModel, load.isOverwrite());
+      loadData(loadModel);
+      txnManager.commitSegment(loadModel);
+    } catch (Exception e) {
+      txnManager.closeSegment(loadModel);
+      LOGGER.error(e, "Failed to load data");
+      throw new StoreException(e);
+    }
+  }
+
+  private void loadData(CarbonLoadModel model) throws Exception {
+    DataLoadExecutor executor = null;
+    try {
+      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+      CarbonInputFormatUtil.createJobTrackerID(new Date());
+      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
+      StoreUtil.configureCSVInputFormat(configuration, model);
+      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
+      // Set up the attempt context required to use in the output committer.
+      TaskAttemptContext hadoopAttemptContext =
+          new TaskAttemptContextImpl(configuration, taskAttemptId);
+
+      CSVInputFormat format = new CSVInputFormat();
+      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
+
+      CarbonIterator<Object[]>[] readerIterators = new 
CSVRecordReaderIterator[splits.size()];
+      for (int index = 0; index < splits.size(); index++) {
+        readerIterators[index] = new CSVRecordReaderIterator(
+            format.createRecordReader(splits.get(index), 
hadoopAttemptContext), splits.get(index),
+            hadoopAttemptContext);
+      }
+
+      executor = new DataLoadExecutor();
+      executor.execute(model, storeConf.storeTempLocation(), readerIterators);
+    } finally {
+      if (executor != null) {
+        executor.close();
+        
StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+      }
+    }
+  }
+
+  @Override
+  public List<CarbonRow> select(SelectDescriptor select) throws IOException {
+    Objects.requireNonNull(select);
+    CarbonTable table = metaProcessor.getTable(select.getTable());
+    List<Distributable> blocks = pruneBlock(table, select.getProjection(), 
select.getFilter());
+    CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, "");
+    Scan scan = new Scan(
+        0, split, table.getTableInfo(), select.getProjection(), 
select.getFilter(),
+        select.getLimit());
+    return scan(table, scan);
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java 
b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
new file mode 100644
index 0000000..6d03711
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.store.api.descriptor.TableDescriptor;
+import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.store.api.exception.StoreException;
+
+class MetaProcessor {
+
+  private static LogService LOGGER =
+      LogServiceFactory.getLogService(MetaProcessor.class.getCanonicalName());
+
+  private CarbonStoreBase store;
+
+  MetaProcessor(CarbonStoreBase store) {
+    this.store = store;
+  }
+
+  // mapping of table path to CarbonTable object
+  private Map<String, CarbonTable> cache = new HashMap<>();
+
+  public void createTable(TableDescriptor descriptor) throws StoreException {
+    Field[] fields = descriptor.getSchema().getFields();
+    // sort_columns
+    List<String> sortColumnsList = null;
+    try {
+      sortColumnsList = 
descriptor.getSchema().prepareSortColumns(descriptor.getProperties());
+    } catch (MalformedCarbonCommandException e) {
+      throw new StoreException(e.getMessage());
+    }
+    ColumnSchema[] sortColumnsSchemaList = new 
ColumnSchema[sortColumnsList.size()];
+
+    TableSchemaBuilder builder = TableSchema.builder();
+    CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, 
sortColumnsSchemaList);
+
+    TableSchema schema = 
builder.tableName(descriptor.getTable().getTableName())
+        .properties(descriptor.getProperties())
+        .setSortColumns(Arrays.asList(sortColumnsSchemaList))
+        .build();
+
+    SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+    schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+    
schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
+    schema.setTableName(descriptor.getTable().getTableName());
+
+    String tablePath = descriptor.getTablePath();
+    if (tablePath == null) {
+      tablePath = store.getTablePath(
+          descriptor.getTable().getTableName(), 
descriptor.getTable().getDatabaseName());
+    }
+
+    TableInfo tableInfo = CarbonTable.builder()
+        .databaseName(descriptor.getTable().getDatabaseName())
+        .tableName(descriptor.getTable().getTableName())
+        .tablePath(tablePath)
+        .tableSchema(schema)
+        .isTransactionalTable(true)
+        .buildTableInfo();
+
+    try {
+      createTable(tableInfo, descriptor.isIfNotExists());
+    } catch (IOException e) {
+      LOGGER.error(e, "create tableDescriptor failed");
+      throw new StoreException(e.getMessage());
+    }
+  }
+
+  private void createTable(TableInfo tableInfo, boolean ifNotExists) throws 
IOException {
+    AbsoluteTableIdentifier identifier = 
tableInfo.getOrCreateAbsoluteTableIdentifier();
+    boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
+    if (tableExists) {
+      if (ifNotExists) {
+        return;
+      } else {
+        throw new IOException(
+            "car't create table " + tableInfo.getDatabaseName() + "." + 
tableInfo.getFactTable()
+                .getTableName() + ", because it already exists");
+      }
+    }
+
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    String databaseName = tableInfo.getDatabaseName();
+    String tableName = tableInfo.getFactTable().getTableName();
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, 
databaseName, tableName);
+
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+    String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
+    FileFactory.FileType fileType = 
FileFactory.getFileType(schemaMetadataPath);
+    try {
+      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, 
fileType);
+        if (!isDirCreated) {
+          throw new IOException("Failed to create the metadata directory " + 
schemaMetadataPath);
+        }
+      }
+      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+      thriftWriter.open(FileWriteOperation.OVERWRITE);
+      thriftWriter.write(thriftTableInfo);
+      thriftWriter.close();
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle create table");
+      throw e;
+    }
+  }
+
+  public void dropTable(TableIdentifier table) throws IOException {
+    String tablePath = store.getTablePath(table.getTableName(), 
table.getDatabaseName());
+    cache.remove(tablePath);
+    FileFactory.deleteFile(tablePath);
+  }
+
+  public CarbonTable getTable(TableIdentifier table) throws IOException {
+    String tablePath = store.getTablePath(table.getTableName(), 
table.getDatabaseName());
+    if (cache.containsKey(tablePath)) {
+      return cache.get(tablePath);
+    } else {
+      org.apache.carbondata.format.TableInfo formatTableInfo =
+          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+          formatTableInfo, table.getDatabaseName(), table.getTableName(), 
tablePath);
+      tableInfo.setTablePath(tablePath);
+      CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      cache.put(tablePath, carbonTable);
+      return carbonTable;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java
new file mode 100644
index 0000000..7be9b74
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+class SegmentTxnManager {
+
+  private LogService LOGGER = 
LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+  private static final SegmentTxnManager instance = new SegmentTxnManager();
+
+  static SegmentTxnManager getInstance() {
+    return instance;
+  }
+
+  void openSegment(CarbonLoadModel loadModel, boolean isOverwriteTable) throws 
IOException {
+    try {
+      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, 
isOverwriteTable);
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle load data");
+      throw e;
+    }
+  }
+
+  void closeSegment(CarbonLoadModel loadModel) throws IOException {
+    try {
+      CarbonLoaderUtil.updateTableStatusForFailure(loadModel, "");
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to close segment");
+      throw e;
+    }
+  }
+
+  void commitSegment(CarbonLoadModel loadModel) throws IOException {
+    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    String segmentId = loadModel.getSegmentId();
+    String segmentFileName = SegmentFileStore
+        .writeSegmentFile(carbonTable, segmentId, 
String.valueOf(loadModel.getFactTimeStamp()));
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier();
+    String tablePath = absoluteTableIdentifier.getTablePath();
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+    try {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table 
status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metadataPath);
+        LoadMetadataDetails loadMetadataDetails = null;
+        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+          // if the segments is in the list of marked for delete then update 
the status.
+          if (segmentId.equals(detail.getLoadName())) {
+            loadMetadataDetails = detail;
+            detail.setSegmentFile(segmentFileName);
+            break;
+          }
+        }
+        if (loadMetadataDetails == null) {
+          throw new IOException("can not find segment: " + segmentId);
+        }
+
+        CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, 
SegmentStatus.SUCCESS,
+            loadModel.getFactTimeStamp(), true);
+        CarbonLoaderUtil
+            .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, 
carbonTable);
+
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table 
path " + tablePath);
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" 
+ tablePath);
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + tablePath + " during 
table status updation");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java 
b/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java
new file mode 100644
index 0000000..ab4caf3
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Status of RPC response
+ */
[email protected]
+public enum Status {
+  SUCCESS, FAILURE
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java
new file mode 100644
index 0000000..59d4aa3
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.master;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.impl.rpc.RegistryService;
+import org.apache.carbondata.store.impl.rpc.ServiceFactory;
+import org.apache.carbondata.store.impl.rpc.StoreService;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+/**
+ * Master of CarbonSearch.
+ * It provides a Registry service for worker to register.
+ */
+class Master {
+
+  private static Master instance = null;
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Master.class.getName());
+
+  // worker host address map to EndpointRef
+  private StoreConf conf;
+  private Configuration hadoopConf;
+  private RPC.Server registryServer = null;
+
+  // mapping of worker IP address to worker instance
+  Map<String, Schedulable> workers = new ConcurrentHashMap<>();
+
+  private Master(StoreConf conf) {
+    this.conf = conf;
+    this.hadoopConf = conf.newHadoopConf();
+  }
+
+  /**
+   * start service and listen on port passed in constructor
+   */
+  public void startService() throws IOException {
+    if (registryServer == null) {
+
+      BindException exception;
+      // we will try to create service at worse case 100 times
+      int numTry = 100;
+      String host = conf.masterHost();
+      int port = conf.masterPort();
+      LOGGER.info("building registry-service on " + host + ":" + port);
+
+      RegistryService registryService = new RegistryServiceImpl(this);
+      do {
+        try {
+          registryServer = new RPC.Builder(hadoopConf)
+              .setBindAddress(host)
+              .setPort(port)
+              .setProtocol(RegistryService.class)
+              .setInstance(registryService)
+              .build();
+
+          registryServer.start();
+          numTry = 0;
+          exception = null;
+        } catch (BindException e) {
+          // port is occupied, increase the port number and try again
+          exception = e;
+          LOGGER.error(e, "start registry-service failed");
+          port = port + 1;
+          numTry = numTry - 1;
+        }
+      } while (numTry > 0);
+      if (exception != null) {
+        // we have tried many times, but still failed to find an available port
+        throw exception;
+      }
+      LOGGER.info("registry-service started");
+    } else {
+      LOGGER.info("Search mode master has already started");
+    }
+  }
+
+  public void stopService() throws InterruptedException {
+    if (registryServer != null) {
+      registryServer.stop();
+      registryServer.join();
+      registryServer = null;
+    }
+  }
+
+  /**
+   * A new searcher is trying to register, add it to the map and connect to 
this searcher
+   */
+  public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) 
throws IOException {
+    LOGGER.info(
+        "Receive Register request from worker " + request.getHostAddress() + 
":" + request.getPort()
+            + " with " + request.getCores() + " cores");
+    String workerId = UUID.randomUUID().toString();
+    String workerAddress = request.getHostAddress();
+    int workerPort = request.getPort();
+    LOGGER.info(
+        "connecting to worker " + request.getHostAddress() + ":" + 
request.getPort() + ", workerId "
+            + workerId);
+
+    StoreService searchService = 
ServiceFactory.createStoreService(workerAddress, workerPort);
+    addWorker(
+        new Schedulable(workerId, workerAddress, workerPort, 
request.getCores(), searchService));
+    LOGGER.info("worker " + request + " registered");
+    return new RegisterWorkerResponse(workerId);
+  }
+
+  /**
+   * A new searcher is trying to register, add it to the map and connect to 
this searcher
+   */
+  private void addWorker(Schedulable schedulable) {
+    workers.put(schedulable.getAddress(), schedulable);
+  }
+
+  public static synchronized Master getInstance(StoreConf conf) {
+    if (instance == null) {
+      instance = new Master(conf);
+    }
+    return instance;
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    if (args.length != 2) {
+      System.err.println("Usage: Master <log4j file> <properties file>");
+      return;
+    }
+
+    StoreUtil.initLog4j(args[0]);
+    StoreConf conf = new StoreConf(args[1]);
+    Master master = getInstance(conf);
+    master.stopService();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
new file mode 100644
index 0000000..492006b
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.master;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.impl.rpc.RegistryService;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.ProtocolSignature;
+
[email protected]
+class RegistryServiceImpl implements RegistryService {
+
+  private Master master;
+
+  RegistryServiceImpl(Master master) {
+    this.master = master;
+  }
+
+  @Override
+  public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) 
throws IOException {
+    return master.addWorker(request);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long 
clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java
new file mode 100644
index 0000000..57d41f0
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.master;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.impl.rpc.StoreService;
+
[email protected]
+public class Schedulable {
+
+  private String id;
+  private String address;
+  private int port;
+  private int cores;
+  public StoreService service;
+  public AtomicInteger workload;
+
+  public Schedulable(String id, String address, int port, int cores, 
StoreService service) {
+    this.id = id;
+    this.address = address;
+    this.port = port;
+    this.cores = cores;
+    this.service = service;
+    this.workload = new AtomicInteger();
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public void setAddress(String address) {
+    this.address = address;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  int getCores() {
+    return cores;
+  }
+
+  @Override public String toString() {
+    return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + 
'\'' + ", port=" + port
+        + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java
new file mode 100644
index 0000000..96a1375
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.store.api.conf.StoreConf;
+import org.apache.carbondata.store.api.exception.SchedulerException;
+import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
+import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
+
+/**
+ * [[Master]] uses Scheduler to pick a Worker to send request
+ */
[email protected]
+public class Scheduler {
+
+  private static LogService LOGGER = 
LogServiceFactory.getLogService(Scheduler.class.getName());
+
+  private AtomicInteger count = new AtomicInteger(0);
+  private ExecutorService executors = Executors.newCachedThreadPool();
+  private Master master;
+
+  public Scheduler(StoreConf storeConf) throws IOException {
+    master = Master.getInstance(storeConf);
+    master.startService();
+  }
+
+  /**
+   * Pick a Worker according to the address and workload of the Worker
+   * Invoke the RPC and return Future result
+   */
+  public Future<QueryResponse> sendRequestAsync(
+      final Schedulable worker, final Scan scan) {
+    LOGGER.info("sending search request to worker " + worker);
+    worker.workload.incrementAndGet();
+    return executors.submit(new Callable<QueryResponse>() {
+      @Override public QueryResponse call() {
+        return worker.service.query(scan);
+      }
+    });
+  }
+
+  public BaseResponse sendRequest(final Schedulable worker,
+      final LoadDataRequest request) {
+
+    LOGGER.info("sending load data request to worker " + worker);
+    worker.workload.incrementAndGet();
+    return worker.service.loadData(request);
+  }
+
+  public Schedulable pickWorker(String splitAddress) {
+    Schedulable worker = master.workers.get(splitAddress);
+    // no local worker available, choose one worker randomly
+    if (worker == null) {
+      worker = pickNexWorker();
+    }
+    // check whether worker exceed max workload, if exceeded, pick next worker
+    int maxWorkload = 
CarbonProperties.getMaxWorkloadForWorker(worker.getCores());
+    int numTry = master.workers.size();
+    do {
+      if (worker.workload.get() >= maxWorkload) {
+        LOGGER.info("worker " + worker + " reach limit, re-select worker...");
+        worker = pickNexWorker();
+        numTry = numTry - 1;
+      } else {
+        numTry = -1;
+      }
+    } while (numTry > 0);
+    if (numTry == 0) {
+      // tried so many times and still not able to find Worker
+      throw new SchedulerException(
+          "All workers are busy, number of workers: " + master.workers.size() +
+              ", workload limit: " + maxWorkload);
+    }
+
+    return worker;
+  }
+
+  public Schedulable pickNexWorker() {
+    if (master.workers.size() == 0) {
+      throw new SchedulerException("No worker is available");
+    }
+    int index = count.getAndIncrement() % master.workers.size();
+    return new ArrayList<>(master.workers.values()).get(index);
+  }
+
+  public void stopAllWorkers() throws IOException {
+    for (Map.Entry<String, Schedulable> entry : master.workers.entrySet()) {
+      try {
+        entry.getValue().service.shutdown(new ShutdownRequest("user"));
+      } catch (Throwable throwable) {
+        throw new IOException(throwable);
+      }
+      master.workers.remove(entry.getKey());
+    }
+  }
+
+  public void stopService() throws InterruptedException {
+    master.stopService();
+  }
+
+  public List<String> getAllWorkerAddresses() {
+    return new ArrayList<>(master.workers.keySet());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java
new file mode 100644
index 0000000..a40f741
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
[email protected]
+public interface RegistryService extends VersionedProtocol {
+  long versionID = 1L;
+  RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java
new file mode 100644
index 0000000..852f14f
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
[email protected]
+public class ServiceFactory {
+
+  public static StoreService createStoreService(String host, int port) throws 
IOException {
+    InetSocketAddress address = new 
InetSocketAddress(InetAddress.getByName(host), port);
+    return RPC.getProxy(
+        StoreService.class, StoreService.versionID, address, new 
Configuration());
+  }
+
+  public static RegistryService createRegistryService(String host, int port) 
throws IOException {
+    InetSocketAddress address = new 
InetSocketAddress(InetAddress.getByName(host), port);
+    return RPC.getProxy(
+        RegistryService.class, RegistryService.versionID, address, new 
Configuration());
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java
new file mode 100644
index 0000000..36702d9
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
+import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
+import org.apache.carbondata.store.impl.rpc.model.Scan;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
+import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
[email protected]
+public interface StoreService extends VersionedProtocol {
+
+  long versionID = 1L;
+
+  BaseResponse loadData(LoadDataRequest request);
+
+  QueryResponse query(Scan scan);
+
+  ShutdownResponse shutdown(ShutdownRequest request);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java
new file mode 100644
index 0000000..b11aa69
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
[email protected]
+public class BaseResponse implements Serializable, Writable {
+  private int status;
+  private String message;
+
+  public BaseResponse() {
+  }
+
+  public BaseResponse(int status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public void setStatus(int status) {
+    this.status = status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(status);
+    out.writeUTF(message);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    message = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java
----------------------------------------------------------------------
diff --git 
a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java
 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java
new file mode 100644
index 0000000..e6c8048
--- /dev/null
+++ 
b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.rpc.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.store.util.StoreUtil;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class LoadDataRequest implements Serializable, Writable {
+
+  private CarbonLoadModel model;
+
+  public LoadDataRequest() {
+  }
+
+  public LoadDataRequest(CarbonLoadModel model) {
+    this.model = model;
+  }
+
+  public CarbonLoadModel getModel() {
+    return model;
+  }
+
+  public void setModel(CarbonLoadModel model) {
+    this.model = model;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model));
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] bytes = WritableUtils.readCompressedByteArray(in);
+    model = (CarbonLoadModel) StoreUtil.deserialize(bytes);
+  }
+}

Reply via email to