http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java deleted file mode 100644 index da2a697..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/conf/StoreConf.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.conf; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -public class StoreConf implements Serializable, Writable { - - public static final String SELECT_PROJECTION = "carbon.select.projection"; - public static final String SELECT_FILTER = "carbon.select.filter"; - public static final String SELECT_LIMIT = "carbon.select.limit"; - - public static final String SELECT_ID = "carbon.select.id"; - - public static final String WORKER_HOST = "carbon.worker.host"; - public static final String WORKER_PORT = "carbon.worker.port"; - public static final String WORKER_CORE_NUM = "carbon.worker.core.num"; - public static final String MASTER_HOST = "carbon.master.host"; - public static final String MASTER_PORT = "carbon.master.port"; - - public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location"; - public static final String STORE_LOCATION = "carbon.store.location"; - - private Map<String, String> conf = new HashMap<>(); - - public StoreConf() { - } - - public StoreConf(String filePath) { - load(filePath); - } - - public StoreConf conf(String key, String value) { - conf.put(key, value); - return this; - } - - public StoreConf conf(String key, int value) { - conf.put(key, "" + value); - return this; - } - - public void load(String filePath) { - StoreUtil.loadProperties(filePath, this); - } - - public void conf(StoreConf conf) { - this.conf.putAll(conf.conf); - } - - public Object conf(String key) { - return conf.get(key); - } - - public String[] projection() { - return stringArrayValue(SELECT_PROJECTION); - } - - public String filter() { - return stringValue(SELECT_FILTER); - } - - public int limit() { - return intValue(SELECT_LIMIT); - } - - public String masterHost() { - return stringValue(MASTER_HOST); - } - - public int masterPort() { - return intValue(MASTER_PORT); - } - - public String workerHost() { - return stringValue(WORKER_HOST); - } - - public int workerPort() { - return intValue(WORKER_PORT); - } - - public int workerCoreNum() { - return intValue(WORKER_CORE_NUM); - } - - public String storeLocation() { - return stringValue(STORE_LOCATION); - } - - public String[] storeTempLocation() { - return stringArrayValue(STORE_TEMP_LOCATION); - } - - public String selectId() { - return stringValue(SELECT_ID); - } - - public Configuration newHadoopConf() { - Configuration hadoopConf = FileFactory.getConfiguration(); - for (Map.Entry<String, String> entry : conf.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key != null && value != null && key.startsWith("carbon.hadoop.")) { - hadoopConf.set(key.substring("carbon.hadoop.".length()), value); - } - } - return hadoopConf; - } - - private String stringValue(String key) { - Object obj = conf.get(key); - if (obj == null) { - return null; - } - return obj.toString(); - } - - private int intValue(String key) { - String value = conf.get(key); - if (value == null) { - return -1; - } - return Integer.parseInt(value); - } - - private String[] stringArrayValue(String key) { - String value = conf.get(key); - if (value == null) { - return null; - } - return value.split(",", -1); - } - - @Override public void write(DataOutput out) throws IOException { - Set<Map.Entry<String, String>> entries = conf.entrySet(); - WritableUtils.writeVInt(out, conf.size()); - for (Map.Entry<String, String> entry : entries) { - WritableUtils.writeString(out, entry.getKey()); - WritableUtils.writeString(out, entry.getValue()); - } - } - - @Override public void readFields(DataInput in) throws IOException { - if (conf == null) { - conf = new HashMap<>(); - } - - int size = WritableUtils.readVInt(in); - String key, value; - for (int i = 0; i < size; i++) { - key = WritableUtils.readString(in); - value = WritableUtils.readString(in); - conf.put(key, value); - } - } -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java deleted file mode 100644 index c7a4d6b..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/exception/ExecutionTimeoutException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.exception; - -public class ExecutionTimeoutException extends RuntimeException { - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java deleted file mode 100644 index c55fa7c..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/exception/StoreException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.exception; - -public class StoreException extends Exception { - - public StoreException() { - super(); - } - - public StoreException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java b/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java deleted file mode 100644 index b366a67..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/exception/WorkerTooBusyException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.exception; - -public class WorkerTooBusyException extends RuntimeException { - - public WorkerTooBusyException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java new file mode 100644 index 0000000..7e50102 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.model.QueryModelBuilder; +import org.apache.carbondata.core.util.CarbonTaskInfo; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.store.api.CarbonStore; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.impl.rpc.model.Scan; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; + +/** + * Provides base functionality of CarbonStore, it contains basic implementation of metadata + * management, data pruning and data scan logic. + */ [email protected] +public abstract class CarbonStoreBase implements CarbonStore { + + private static LogService LOGGER = + LogServiceFactory.getLogService(CarbonStoreBase.class.getCanonicalName()); + + MetaProcessor metaProcessor; + private StoreConf storeConf; + + CarbonStoreBase(StoreConf storeConf) { + this.storeConf = storeConf; + this.metaProcessor = new MetaProcessor(this); + } + + @Override + public void createTable(TableDescriptor table) throws IOException, StoreException { + Objects.requireNonNull(table); + metaProcessor.createTable(table); + } + + @Override + public void dropTable(TableIdentifier table) throws IOException { + Objects.requireNonNull(table); + metaProcessor.dropTable(table); + } + + @Override + public CarbonTable getTable(TableIdentifier table) throws IOException { + Objects.requireNonNull(table); + return metaProcessor.getTable(table); + } + + public String getTablePath(String tableName, String databaseName) { + Objects.requireNonNull(tableName); + Objects.requireNonNull(databaseName); + return String.format("%s/%s", storeConf.storeLocation(), tableName); + } + + /** + * Prune data by using CarbonInputFormat.getSplit + * Return a mapping of host address to list of block. + * This should be invoked in driver side. + */ + public static List<Distributable> pruneBlock(CarbonTable table, String[] columns, + Expression filter) throws IOException { + Objects.requireNonNull(table); + Objects.requireNonNull(columns); + JobConf jobConf = new JobConf(new Configuration()); + Job job = new Job(jobConf); + CarbonTableInputFormat format; + try { + format = CarbonInputFormatUtil.createCarbonTableInputFormat( + job, table, columns, filter, null, null, true); + } catch (InvalidConfigurationException e) { + throw new IOException(e.getMessage()); + } + + // We will do FG pruning in reader side, so don't do it here + CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false); + List<InputSplit> splits = format.getSplits(job); + List<Distributable> blockInfos = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + blockInfos.add((Distributable) split); + } + return blockInfos; + } + + /** + * Scan data and return matched rows. This should be invoked in worker side. + * @param table carbon table + * @param scan scan parameter + * @return matched rows + * @throws IOException if IO error occurs + */ + public static List<CarbonRow> scan(CarbonTable table, Scan scan) throws IOException { + CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo(); + carbonTaskInfo.setTaskId(System.nanoTime()); + ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo); + + CarbonMultiBlockSplit mbSplit = scan.getSplit(); + long limit = scan.getLimit(); + QueryModel queryModel = createQueryModel(table, scan); + + LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", scan.getRequestId(), + queryModel.toString(), mbSplit.getAllSplits().size())); + + // read all rows by the reader + List<CarbonRow> rows = new LinkedList<>(); + try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(scan.getRequestId(), + table, queryModel)) { + reader.initialize(mbSplit, null); + + // loop to read required number of rows. + // By default, if user does not specify the limit value, limit is Long.MaxValue + long rowCount = 0; + while (reader.nextKeyValue() && rowCount < limit) { + rows.add(reader.getCurrentValue()); + rowCount++; + } + } catch (InterruptedException e) { + throw new IOException(e); + } + LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows", + scan.getRequestId(), rows.size())); + return rows; + } + + private static QueryModel createQueryModel(CarbonTable table, Scan scan) { + String[] projectColumns = scan.getProjectColumns(); + Expression filter = null; + if (scan.getFilterExpression() != null) { + filter = scan.getFilterExpression(); + } + return new QueryModelBuilder(table) + .projectColumns(projectColumns) + .filterExpression(filter) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java new file mode 100644 index 0000000..3667aea --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.exception.ExecutionTimeoutException; +import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.impl.master.Schedulable; +import org.apache.carbondata.store.impl.master.Scheduler; +import org.apache.carbondata.store.impl.rpc.model.BaseResponse; +import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.impl.rpc.model.QueryResponse; +import org.apache.carbondata.store.impl.rpc.model.Scan; + +/** + * A CarbonStore that leverage multiple servers via RPC calls (Master and Workers) + */ [email protected] [email protected] +class DistributedCarbonStore extends CarbonStoreBase { + private static LogService LOGGER = + LogServiceFactory.getLogService(DistributedCarbonStore.class.getCanonicalName()); + private SegmentTxnManager txnManager; + private Scheduler scheduler; + private Random random = new Random(); + + DistributedCarbonStore(StoreConf storeConf) throws IOException { + super(storeConf); + this.scheduler = new Scheduler(storeConf); + txnManager = SegmentTxnManager.getInstance(); + } + + @Override + public void loadData(LoadDescriptor load) throws IOException, StoreException { + Objects.requireNonNull(load); + CarbonTable table = metaProcessor.getTable(load.getTable()); + CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); + builder.setInputPath(load.getInputPath()); + CarbonLoadModel loadModel; + try { + loadModel = builder.build(load.getOptions(), System.currentTimeMillis(), "0"); + } catch (InvalidLoadOptionException e) { + LOGGER.error(e, "Invalid loadDescriptor options"); + throw new StoreException(e); + } catch (IOException e) { + LOGGER.error(e, "Failed to loadDescriptor data"); + throw e; + } + + Schedulable worker = scheduler.pickNexWorker(); + try { + if (loadModel.getFactTimeStamp() == 0) { + loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()); + } + txnManager.openSegment(loadModel, load.isOverwrite()); + LoadDataRequest request = new LoadDataRequest(loadModel); + BaseResponse response = scheduler.sendRequest(worker, request); + if (Status.SUCCESS.ordinal() == response.getStatus()) { + txnManager.commitSegment(loadModel); + } else { + txnManager.closeSegment(loadModel); + throw new StoreException(response.getMessage()); + } + } finally { + worker.workload.decrementAndGet(); + } + } + + @Override + public List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException { + Objects.requireNonNull(select); + CarbonTable carbonTable = metaProcessor.getTable(select.getTable()); + return select( + carbonTable, + select.getProjection(), + select.getFilter(), + select.getLimit(), + select.getLimit()); + } + + /** + * Execute search by firing RPC call to worker, return the result rows + * + * @param table table to search + * @param columns projection column names + * @param filter filter expression + * @param globalLimit max number of rows required in Master + * @param localLimit max number of rows required in Worker + * @return CarbonRow + */ + private List<CarbonRow> select(CarbonTable table, String[] columns, Expression filter, + long globalLimit, long localLimit) throws IOException { + Objects.requireNonNull(table); + Objects.requireNonNull(columns); + if (globalLimit < 0 || localLimit < 0) { + throw new IllegalArgumentException("limit should be positive"); + } + + int queryId = random.nextInt(); + + List<CarbonRow> output = new ArrayList<>(); + + // prune data and get a mapping of worker hostname to list of blocks, + // then add these blocks to the Scan and fire the RPC call + List<Distributable> blockInfos = pruneBlock(table, columns, filter); + + Map<String, List<Distributable>> nodeBlockMapping = + CarbonLoaderUtil.nodeBlockMapping( + blockInfos, -1, scheduler.getAllWorkerAddresses(), + CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null); + + Set<Map.Entry<String, List<Distributable>>> entries = nodeBlockMapping.entrySet(); + List<Future<QueryResponse>> futures = new ArrayList<>(entries.size()); + List<Schedulable> workers = new ArrayList<>(entries.size()); + for (Map.Entry<String, List<Distributable>> entry : entries) { + CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(entry.getValue(), entry.getKey()); + Scan scan = + new Scan(queryId, split, table.getTableInfo(), columns, filter, localLimit); + + // Find an Endpoind and send the request to it + // This RPC is non-blocking so that we do not need to wait before send to next worker + Schedulable worker = scheduler.pickWorker(entry.getKey()); + workers.add(worker); + futures.add(scheduler.sendRequestAsync(worker, scan)); + } + + int rowCount = 0; + int length = futures.size(); + for (int i = 0; i < length; i++) { + Future<QueryResponse> future = futures.get(i); + Schedulable worker = workers.get(i); + if (rowCount < globalLimit) { + // wait for worker + QueryResponse response = null; + try { + response = future + .get((long) (CarbonProperties.getInstance().getQueryTimeout()), TimeUnit.SECONDS); + } catch (ExecutionException | InterruptedException e) { + throw new IOException("exception in worker: " + e.getMessage()); + } catch (TimeoutException t) { + throw new ExecutionTimeoutException(); + } finally { + worker.workload.decrementAndGet(); + } + LOGGER.info("[QueryId: " + queryId + "] receive search response from worker " + worker); + rowCount += onSuccess(queryId, response, output, globalLimit); + } + } + return output; + } + + private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> output, long globalLimit) + throws IOException { + // in case of RPC success, collect all rows in response message + if (result.getQueryId() != queryId) { + throw new IOException( + "queryId in response does not match request: " + result.getQueryId() + " != " + queryId); + } + if (result.getStatus() != Status.SUCCESS.ordinal()) { + throw new IOException("failure in worker: " + result.getMessage()); + } + int rowCount = 0; + Object[][] rows = result.getRows(); + for (Object[] row : rows) { + output.add(new CarbonRow(row)); + rowCount++; + if (rowCount >= globalLimit) { + break; + } + } + LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + rowCount); + return rowCount; + } + + @Override + public void close() throws IOException { + LOGGER.info("Shutting down all workers..."); + scheduler.stopAllWorkers(); + LOGGER.info("All workers are shut down"); + try { + LOGGER.info("Stopping master..."); + scheduler.stopService(); + LOGGER.info("Master stopped"); + } catch (InterruptedException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java new file mode 100644 index 0000000..64f0742 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a special RecordReader that leverages FGDataMap before reading carbondata file + * and return CarbonRow object + */ +class IndexedRecordReader extends CarbonRecordReader<CarbonRow> { + + private static final LogService LOG = + LogServiceFactory.getLogService(IndexedRecordReader.class.getName()); + + private int queryId; + private CarbonTable table; + + public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) { + super(queryModel, new CarbonRowReadSupport()); + this.queryId = queryId; + this.table = table; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + CarbonMultiBlockSplit mbSplit = (CarbonMultiBlockSplit) inputSplit; + List<CarbonInputSplit> splits = mbSplit.getAllSplits(); + List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); + queryModel.setTableBlockInfos(list); + + // prune the block with FGDataMap is there is one based on the filter condition + DataMapExprWrapper fgDataMap = chooseFGDataMap(table, + queryModel.getFilterExpressionResolverTree()); + if (fgDataMap != null) { + queryModel = prune(table, queryModel, mbSplit, fgDataMap); + } else { + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splits); + queryModel.setTableBlockInfos(tableBlockInfoList); + } + + readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); + try { + carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); + } catch (QueryExecutionException e) { + throw new InterruptedException(e.getMessage()); + } + } + + private DataMapExprWrapper chooseFGDataMap( + CarbonTable table, + FilterResolverIntf filterInterface) { + DataMapChooser chooser = null; + try { + chooser = new DataMapChooser(table); + return chooser.chooseFGDataMap(filterInterface); + } catch (IOException e) { + LOG.error(e); + return null; + } + } + + /** + * If there is FGDataMap defined for this table and filter condition in the query, + * prune the splits by the DataMap and set the pruned split into the QueryModel and return + */ + private QueryModel prune(CarbonTable table, QueryModel queryModel, + CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { + Objects.requireNonNull(datamap); + List<Segment> segments = new LinkedList<>(); + HashMap<String, Integer> uniqueSegments = new HashMap<>(); + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); + for (CarbonInputSplit split : mbSplit.getAllSplits()) { + String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString(); + if (uniqueSegments.get(segmentId) == null) { + segments.add(Segment.toSegment(segmentId, + new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(), + loadMetadataDetails))); + uniqueSegments.put(segmentId, 1); + } else { + uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); + } + } + + List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); + List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); + for (int i = 0; i < distributables.size(); i++) { + DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); + prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); + } + + HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); + for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { + pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); + } + + List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); + List<TableBlockInfo> blockToRead = new LinkedList<>(); + for (TableBlockInfo block : blocks) { + if (pathToRead.keySet().contains(block.getFilePath())) { + // If not set this, it won't create FineGrainBlocklet object in + // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData + block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); + blockToRead.add(block); + } + } + LOG.info(String.format("[QueryId:%d] pruned using FG DataMap, pruned blocks: %d", queryId, + blockToRead.size())); + queryModel.setTableBlockInfos(blockToRead); + return queryModel; + } + + @Override public void close() throws IOException { + logStatistics(rowCount, queryModel.getStatisticsRecorder()); + // clear dictionary cache + Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); + if (null != columnToDictionaryMapping) { + for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { + CarbonUtil.clearDictionaryCache(entry.getValue()); + } + } + + // close read support + readSupport.close(); + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java new file mode 100644 index 0000000..40b6bdf --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.descriptor.LoadDescriptor; +import org.apache.carbondata.store.api.descriptor.SelectDescriptor; +import org.apache.carbondata.store.api.exception.StoreException; +import org.apache.carbondata.store.impl.rpc.model.Scan; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * A CarbonStore implementation that works locally, without other compute framework dependency. + * It can be used to read data in local disk. + */ [email protected] [email protected] +class LocalCarbonStore extends CarbonStoreBase { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalCarbonStore.class.getName()); + + private StoreConf storeConf; + private Configuration configuration; + private SegmentTxnManager txnManager; + + LocalCarbonStore(StoreConf storeConf) { + this(storeConf, new Configuration()); + } + + LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) { + super(storeConf); + this.storeConf = storeConf; + this.txnManager = SegmentTxnManager.getInstance(); + this.configuration = new Configuration(hadoopConf); + } + + @Override + public void loadData(LoadDescriptor load) throws IOException, StoreException { + Objects.requireNonNull(load); + CarbonTable table = metaProcessor.getTable(load.getTable()); + CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table); + modelBuilder.setInputPath(load.getInputPath()); + CarbonLoadModel loadModel; + try { + loadModel = modelBuilder.build(load.getOptions(), System.currentTimeMillis(), "0"); + } catch (InvalidLoadOptionException e) { + LOGGER.error(e, "Invalid loadDescriptor options"); + throw new StoreException(e.getMessage()); + } + + if (loadModel.getFactTimeStamp() == 0) { + loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()); + } + + try { + txnManager.openSegment(loadModel, load.isOverwrite()); + loadData(loadModel); + txnManager.commitSegment(loadModel); + } catch (Exception e) { + txnManager.closeSegment(loadModel); + LOGGER.error(e, "Failed to load data"); + throw new StoreException(e); + } + } + + private void loadData(CarbonLoadModel model) throws Exception { + DataLoadExecutor executor = null; + try { + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + CarbonInputFormatUtil.createJobTrackerID(new Date()); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0); + StoreUtil.configureCSVInputFormat(configuration, model); + configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath()); + // Set up the attempt context required to use in the output committer. + TaskAttemptContext hadoopAttemptContext = + new TaskAttemptContextImpl(configuration, taskAttemptId); + + CSVInputFormat format = new CSVInputFormat(); + List<InputSplit> splits = format.getSplits(hadoopAttemptContext); + + CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()]; + for (int index = 0; index < splits.size(); index++) { + readerIterators[index] = new CSVRecordReaderIterator( + format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index), + hadoopAttemptContext); + } + + executor = new DataLoadExecutor(); + executor.execute(model, storeConf.storeTempLocation(), readerIterators); + } finally { + if (executor != null) { + executor.close(); + StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); + } + } + } + + @Override + public List<CarbonRow> select(SelectDescriptor select) throws IOException { + Objects.requireNonNull(select); + CarbonTable table = metaProcessor.getTable(select.getTable()); + List<Distributable> blocks = pruneBlock(table, select.getProjection(), select.getFilter()); + CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, ""); + Scan scan = new Scan( + 0, split, table.getTableInfo(), select.getProjection(), select.getFilter(), + select.getLimit()); + return scan(table, scan); + } + + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java new file mode 100644 index 0000000..6d03711 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.ThriftWriter; +import org.apache.carbondata.sdk.file.CarbonWriterBuilder; +import org.apache.carbondata.sdk.file.Field; +import org.apache.carbondata.store.api.descriptor.TableDescriptor; +import org.apache.carbondata.store.api.descriptor.TableIdentifier; +import org.apache.carbondata.store.api.exception.StoreException; + +class MetaProcessor { + + private static LogService LOGGER = + LogServiceFactory.getLogService(MetaProcessor.class.getCanonicalName()); + + private CarbonStoreBase store; + + MetaProcessor(CarbonStoreBase store) { + this.store = store; + } + + // mapping of table path to CarbonTable object + private Map<String, CarbonTable> cache = new HashMap<>(); + + public void createTable(TableDescriptor descriptor) throws StoreException { + Field[] fields = descriptor.getSchema().getFields(); + // sort_columns + List<String> sortColumnsList = null; + try { + sortColumnsList = descriptor.getSchema().prepareSortColumns(descriptor.getProperties()); + } catch (MalformedCarbonCommandException e) { + throw new StoreException(e.getMessage()); + } + ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()]; + + TableSchemaBuilder builder = TableSchema.builder(); + CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList); + + TableSchema schema = builder.tableName(descriptor.getTable().getTableName()) + .properties(descriptor.getProperties()) + .setSortColumns(Arrays.asList(sortColumnsSchemaList)) + .build(); + + SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry(); + schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis()); + schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry); + schema.setTableName(descriptor.getTable().getTableName()); + + String tablePath = descriptor.getTablePath(); + if (tablePath == null) { + tablePath = store.getTablePath( + descriptor.getTable().getTableName(), descriptor.getTable().getDatabaseName()); + } + + TableInfo tableInfo = CarbonTable.builder() + .databaseName(descriptor.getTable().getDatabaseName()) + .tableName(descriptor.getTable().getTableName()) + .tablePath(tablePath) + .tableSchema(schema) + .isTransactionalTable(true) + .buildTableInfo(); + + try { + createTable(tableInfo, descriptor.isIfNotExists()); + } catch (IOException e) { + LOGGER.error(e, "create tableDescriptor failed"); + throw new StoreException(e.getMessage()); + } + } + + private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException { + AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier(); + boolean tableExists = FileFactory.isFileExist(identifier.getTablePath()); + if (tableExists) { + if (ifNotExists) { + return; + } else { + throw new IOException( + "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable() + .getTableName() + ", because it already exists"); + } + } + + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + String databaseName = tableInfo.getDatabaseName(); + String tableName = tableInfo.getFactTable().getTableName(); + org.apache.carbondata.format.TableInfo thriftTableInfo = + schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName); + + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); + String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); + FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); + try { + if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { + boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType); + if (!isDirCreated) { + throw new IOException("Failed to create the metadata directory " + schemaMetadataPath); + } + } + ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); + thriftWriter.open(FileWriteOperation.OVERWRITE); + thriftWriter.write(thriftTableInfo); + thriftWriter.close(); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle create table"); + throw e; + } + } + + public void dropTable(TableIdentifier table) throws IOException { + String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName()); + cache.remove(tablePath); + FileFactory.deleteFile(tablePath); + } + + public CarbonTable getTable(TableIdentifier table) throws IOException { + String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName()); + if (cache.containsKey(tablePath)) { + return cache.get(tablePath); + } else { + org.apache.carbondata.format.TableInfo formatTableInfo = + CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath)); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo( + formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath); + tableInfo.setTablePath(tablePath); + CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + cache.put(tablePath, carbonTable); + return carbonTable; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java b/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java new file mode 100644 index 0000000..7be9b74 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/SegmentTxnManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.locks.CarbonLockUtil; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +class SegmentTxnManager { + + private LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName()); + private static final SegmentTxnManager instance = new SegmentTxnManager(); + + static SegmentTxnManager getInstance() { + return instance; + } + + void openSegment(CarbonLoadModel loadModel, boolean isOverwriteTable) throws IOException { + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, isOverwriteTable); + } catch (IOException e) { + LOGGER.error(e, "Failed to handle load data"); + throw e; + } + } + + void closeSegment(CarbonLoadModel loadModel) throws IOException { + try { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel, ""); + } catch (IOException e) { + LOGGER.error(e, "Failed to close segment"); + throw e; + } + } + + void commitSegment(CarbonLoadModel loadModel) throws IOException { + CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + String segmentId = loadModel.getSegmentId(); + String segmentFileName = SegmentFileStore + .writeSegmentFile(carbonTable, segmentId, String.valueOf(loadModel.getFactTimeStamp())); + + AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); + String tablePath = absoluteTableIdentifier.getTablePath(); + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); + + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + try { + if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { + LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = + SegmentStatusManager.readLoadMetadata(metadataPath); + LoadMetadataDetails loadMetadataDetails = null; + for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { + // if the segments is in the list of marked for delete then update the status. + if (segmentId.equals(detail.getLoadName())) { + loadMetadataDetails = detail; + detail.setSegmentFile(segmentFileName); + break; + } + } + if (loadMetadataDetails == null) { + throw new IOException("can not find segment: " + segmentId); + } + + CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonLoaderUtil + .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, segmentId, carbonTable); + + SegmentStatusManager + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + } else { + LOGGER.error( + "Not able to acquire the lock for Table status updation for table path " + tablePath); + } + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + tablePath); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + tablePath + " during table status updation"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java b/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java new file mode 100644 index 0000000..ab4caf3 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/Status.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +/** + * Status of RPC response + */ [email protected] +public enum Status { + SUCCESS, FAILURE +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java new file mode 100644 index 0000000..59d4aa3 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Master.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.master; + +import java.io.IOException; +import java.net.BindException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.impl.rpc.RegistryService; +import org.apache.carbondata.store.impl.rpc.ServiceFactory; +import org.apache.carbondata.store.impl.rpc.StoreService; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +/** + * Master of CarbonSearch. + * It provides a Registry service for worker to register. + */ +class Master { + + private static Master instance = null; + + private static LogService LOGGER = LogServiceFactory.getLogService(Master.class.getName()); + + // worker host address map to EndpointRef + private StoreConf conf; + private Configuration hadoopConf; + private RPC.Server registryServer = null; + + // mapping of worker IP address to worker instance + Map<String, Schedulable> workers = new ConcurrentHashMap<>(); + + private Master(StoreConf conf) { + this.conf = conf; + this.hadoopConf = conf.newHadoopConf(); + } + + /** + * start service and listen on port passed in constructor + */ + public void startService() throws IOException { + if (registryServer == null) { + + BindException exception; + // we will try to create service at worse case 100 times + int numTry = 100; + String host = conf.masterHost(); + int port = conf.masterPort(); + LOGGER.info("building registry-service on " + host + ":" + port); + + RegistryService registryService = new RegistryServiceImpl(this); + do { + try { + registryServer = new RPC.Builder(hadoopConf) + .setBindAddress(host) + .setPort(port) + .setProtocol(RegistryService.class) + .setInstance(registryService) + .build(); + + registryServer.start(); + numTry = 0; + exception = null; + } catch (BindException e) { + // port is occupied, increase the port number and try again + exception = e; + LOGGER.error(e, "start registry-service failed"); + port = port + 1; + numTry = numTry - 1; + } + } while (numTry > 0); + if (exception != null) { + // we have tried many times, but still failed to find an available port + throw exception; + } + LOGGER.info("registry-service started"); + } else { + LOGGER.info("Search mode master has already started"); + } + } + + public void stopService() throws InterruptedException { + if (registryServer != null) { + registryServer.stop(); + registryServer.join(); + registryServer = null; + } + } + + /** + * A new searcher is trying to register, add it to the map and connect to this searcher + */ + public RegisterWorkerResponse addWorker(RegisterWorkerRequest request) throws IOException { + LOGGER.info( + "Receive Register request from worker " + request.getHostAddress() + ":" + request.getPort() + + " with " + request.getCores() + " cores"); + String workerId = UUID.randomUUID().toString(); + String workerAddress = request.getHostAddress(); + int workerPort = request.getPort(); + LOGGER.info( + "connecting to worker " + request.getHostAddress() + ":" + request.getPort() + ", workerId " + + workerId); + + StoreService searchService = ServiceFactory.createStoreService(workerAddress, workerPort); + addWorker( + new Schedulable(workerId, workerAddress, workerPort, request.getCores(), searchService)); + LOGGER.info("worker " + request + " registered"); + return new RegisterWorkerResponse(workerId); + } + + /** + * A new searcher is trying to register, add it to the map and connect to this searcher + */ + private void addWorker(Schedulable schedulable) { + workers.put(schedulable.getAddress(), schedulable); + } + + public static synchronized Master getInstance(StoreConf conf) { + if (instance == null) { + instance = new Master(conf); + } + return instance; + } + + public static void main(String[] args) throws InterruptedException { + if (args.length != 2) { + System.err.println("Usage: Master <log4j file> <properties file>"); + return; + } + + StoreUtil.initLog4j(args[0]); + StoreConf conf = new StoreConf(args[1]); + Master master = getInstance(conf); + master.stopService(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java new file mode 100644 index 0000000..492006b --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.master; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.impl.rpc.RegistryService; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; + +import org.apache.hadoop.ipc.ProtocolSignature; + [email protected] +class RegistryServiceImpl implements RegistryService { + + private Master master; + + RegistryServiceImpl(Master master) { + this.master = master; + } + + @Override + public RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException { + return master.addWorker(request); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java new file mode 100644 index 0000000..57d41f0 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Schedulable.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.master; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.impl.rpc.StoreService; + [email protected] +public class Schedulable { + + private String id; + private String address; + private int port; + private int cores; + public StoreService service; + public AtomicInteger workload; + + public Schedulable(String id, String address, int port, int cores, StoreService service) { + this.id = id; + this.address = address; + this.port = port; + this.cores = cores; + this.service = service; + this.workload = new AtomicInteger(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + int getCores() { + return cores; + } + + @Override public String toString() { + return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java new file mode 100644 index 0000000..96a1375 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/master/Scheduler.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.store.api.conf.StoreConf; +import org.apache.carbondata.store.api.exception.SchedulerException; +import org.apache.carbondata.store.impl.rpc.model.BaseResponse; +import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.impl.rpc.model.QueryResponse; +import org.apache.carbondata.store.impl.rpc.model.Scan; +import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; + +/** + * [[Master]] uses Scheduler to pick a Worker to send request + */ [email protected] +public class Scheduler { + + private static LogService LOGGER = LogServiceFactory.getLogService(Scheduler.class.getName()); + + private AtomicInteger count = new AtomicInteger(0); + private ExecutorService executors = Executors.newCachedThreadPool(); + private Master master; + + public Scheduler(StoreConf storeConf) throws IOException { + master = Master.getInstance(storeConf); + master.startService(); + } + + /** + * Pick a Worker according to the address and workload of the Worker + * Invoke the RPC and return Future result + */ + public Future<QueryResponse> sendRequestAsync( + final Schedulable worker, final Scan scan) { + LOGGER.info("sending search request to worker " + worker); + worker.workload.incrementAndGet(); + return executors.submit(new Callable<QueryResponse>() { + @Override public QueryResponse call() { + return worker.service.query(scan); + } + }); + } + + public BaseResponse sendRequest(final Schedulable worker, + final LoadDataRequest request) { + + LOGGER.info("sending load data request to worker " + worker); + worker.workload.incrementAndGet(); + return worker.service.loadData(request); + } + + public Schedulable pickWorker(String splitAddress) { + Schedulable worker = master.workers.get(splitAddress); + // no local worker available, choose one worker randomly + if (worker == null) { + worker = pickNexWorker(); + } + // check whether worker exceed max workload, if exceeded, pick next worker + int maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.getCores()); + int numTry = master.workers.size(); + do { + if (worker.workload.get() >= maxWorkload) { + LOGGER.info("worker " + worker + " reach limit, re-select worker..."); + worker = pickNexWorker(); + numTry = numTry - 1; + } else { + numTry = -1; + } + } while (numTry > 0); + if (numTry == 0) { + // tried so many times and still not able to find Worker + throw new SchedulerException( + "All workers are busy, number of workers: " + master.workers.size() + + ", workload limit: " + maxWorkload); + } + + return worker; + } + + public Schedulable pickNexWorker() { + if (master.workers.size() == 0) { + throw new SchedulerException("No worker is available"); + } + int index = count.getAndIncrement() % master.workers.size(); + return new ArrayList<>(master.workers.values()).get(index); + } + + public void stopAllWorkers() throws IOException { + for (Map.Entry<String, Schedulable> entry : master.workers.entrySet()) { + try { + entry.getValue().service.shutdown(new ShutdownRequest("user")); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + master.workers.remove(entry.getKey()); + } + } + + public void stopService() throws InterruptedException { + master.stopService(); + } + + public List<String> getAllWorkerAddresses() { + return new ArrayList<>(master.workers.keySet()); + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java new file mode 100644 index 0000000..a40f741 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/RegistryService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse; + +import org.apache.hadoop.ipc.VersionedProtocol; + [email protected] +public interface RegistryService extends VersionedProtocol { + long versionID = 1L; + RegisterWorkerResponse registerWorker(RegisterWorkerRequest request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java new file mode 100644 index 0000000..852f14f --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/ServiceFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + [email protected] +public class ServiceFactory { + + public static StoreService createStoreService(String host, int port) throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); + return RPC.getProxy( + StoreService.class, StoreService.versionID, address, new Configuration()); + } + + public static RegistryService createRegistryService(String host, int port) throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host), port); + return RPC.getProxy( + RegistryService.class, RegistryService.versionID, address, new Configuration()); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java new file mode 100644 index 0000000..36702d9 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/StoreService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.impl.rpc.model.BaseResponse; +import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.impl.rpc.model.QueryResponse; +import org.apache.carbondata.store.impl.rpc.model.Scan; +import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse; + +import org.apache.hadoop.ipc.VersionedProtocol; + [email protected] +public interface StoreService extends VersionedProtocol { + + long versionID = 1L; + + BaseResponse loadData(LoadDataRequest request); + + QueryResponse query(Scan scan); + + ShutdownResponse shutdown(ShutdownRequest request); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java new file mode 100644 index 0000000..b11aa69 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/BaseResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + [email protected] +public class BaseResponse implements Serializable, Writable { + private int status; + private String message; + + public BaseResponse() { + } + + public BaseResponse(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(status); + out.writeUTF(message); + } + + @Override + public void readFields(DataInput in) throws IOException { + status = in.readInt(); + message = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/85cdc404/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java new file mode 100644 index 0000000..e6c8048 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/impl/rpc/model/LoadDataRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.impl.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class LoadDataRequest implements Serializable, Writable { + + private CarbonLoadModel model; + + public LoadDataRequest() { + } + + public LoadDataRequest(CarbonLoadModel model) { + this.model = model; + } + + public CarbonLoadModel getModel() { + return model; + } + + public void setModel(CarbonLoadModel model) { + this.model = model; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model)); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] bytes = WritableUtils.readCompressedByteArray(in); + model = (CarbonLoadModel) StoreUtil.deserialize(bytes); + } +}
