[CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API This closes #2589
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9f10122a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9f10122a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9f10122a Branch: refs/heads/carbonstore Commit: 9f10122af65fcd518bfe8855c4eaa1a423e81caa Parents: a6027ae Author: Jacky Li <jacky.li...@qq.com> Authored: Wed Aug 1 02:16:26 2018 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Mon Aug 6 19:03:12 2018 +0800 ---------------------------------------------------------------------- .../common/annotations/InterfaceStability.java | 2 +- .../core/datastore/row/CarbonRow.java | 30 ++- .../core/metadata/schema/table/TableInfo.java | 2 +- .../carbondata/hadoop/CarbonInputSplit.java | 44 +++- .../hadoop/api/CarbonInputFormat.java | 28 ++- .../hadoop/api/CarbonTableInputFormat.java | 11 + .../hadoop/readsupport/CarbonReadSupport.java | 3 +- .../apache/carbondata/store/WorkerManager.scala | 4 +- .../org/apache/spark/sql/CarbonSession.scala | 22 +- .../carbondata/zeppelin/TestCarbonResponse.java | 4 +- store/conf/store.conf | 4 +- store/core/pom.xml | 4 +- .../carbondata/store/api/CarbonStore.java | 32 --- .../store/api/CarbonStoreFactory.java | 93 -------- .../apache/carbondata/store/api/DataStore.java | 51 ---- .../apache/carbondata/store/api/MetaStore.java | 50 ---- .../apache/carbondata/store/api/SqlStore.java | 34 --- .../carbondata/store/api/conf/StoreConf.java | 197 ---------------- .../store/api/descriptor/LoadDescriptor.java | 114 --------- .../store/api/descriptor/SelectDescriptor.java | 111 --------- .../store/api/descriptor/TableDescriptor.java | 174 -------------- .../store/api/descriptor/TableIdentifier.java | 37 --- .../exception/ExecutionTimeoutException.java | 22 -- .../store/api/exception/SchedulerException.java | 26 --- .../store/api/exception/StoreException.java | 33 --- .../carbondata/store/devapi/DataLoader.java | 49 ++++ .../carbondata/store/devapi/DataScanner.java | 51 ++++ .../store/devapi/InternalCarbonStore.java | 72 ++++++ .../devapi/InternalCarbonStoreFactory.java | 45 ++++ .../apache/carbondata/store/devapi/Pruner.java | 46 ++++ .../carbondata/store/devapi/ResultBatch.java | 47 ++++ .../carbondata/store/devapi/ScanOption.java | 69 ++++++ .../carbondata/store/devapi/ScanUnit.java | 41 ++++ .../apache/carbondata/store/devapi/Scanner.java | 33 +++ .../store/devapi/TransactionalOperation.java | 35 +++ .../carbondata/store/impl/BlockScanUnit.java | 74 ++++++ .../carbondata/store/impl/CarbonStoreBase.java | 177 -------------- .../carbondata/store/impl/DataOperation.java | 95 ++++++++ .../carbondata/store/impl/DataServicePool.java | 41 ++++ .../carbondata/store/impl/DelegatedScanner.java | 57 +++++ .../store/impl/DistributedCarbonStore.java | 232 ------------------- .../carbondata/store/impl/IndexOperation.java | 61 +++++ .../store/impl/IndexedRecordReader.java | 2 +- .../store/impl/InternalCarbonStoreImpl.java | 110 +++++++++ .../carbondata/store/impl/LocalCarbonStore.java | 95 +++++--- .../carbondata/store/impl/LocalDataScanner.java | 73 ++++++ .../carbondata/store/impl/LocalPruner.java | 58 +++++ .../carbondata/store/impl/MetaOperation.java | 212 +++++++++++++++++ .../carbondata/store/impl/MetaProcessor.java | 170 -------------- .../store/impl/RemoteDataScanner.java | 87 +++++++ .../carbondata/store/impl/RemotePruner.java | 56 +++++ .../store/impl/RowMajorResultBatch.java | 49 ++++ .../carbondata/store/impl/Schedulable.java | 100 ++++++++ .../carbondata/store/impl/master/Master.java | 172 +++++++++++--- .../store/impl/master/PruneServiceImpl.java | 89 +++++++ .../store/impl/master/RegistryServiceImpl.java | 8 +- .../store/impl/master/Schedulable.java | 76 ------ .../carbondata/store/impl/master/Scheduler.java | 71 +----- .../store/impl/master/StoreServiceImpl.java | 100 ++++++++ .../store/impl/rpc/RegistryService.java | 32 --- .../store/impl/rpc/ServiceFactory.java | 43 ---- .../carbondata/store/impl/rpc/StoreService.java | 40 ---- .../store/impl/rpc/model/BaseResponse.java | 69 ------ .../store/impl/rpc/model/LoadDataRequest.java | 60 ----- .../store/impl/rpc/model/QueryResponse.java | 73 ------ .../impl/rpc/model/RegisterWorkerRequest.java | 73 ------ .../impl/rpc/model/RegisterWorkerResponse.java | 54 ----- .../carbondata/store/impl/rpc/model/Scan.java | 108 --------- .../store/impl/rpc/model/ShutdownRequest.java | 53 ----- .../store/impl/rpc/model/ShutdownResponse.java | 61 ----- .../store/impl/service/DataService.java | 53 +++++ .../store/impl/service/PruneService.java | 41 ++++ .../store/impl/service/RegistryService.java | 32 +++ .../store/impl/service/ServiceFactory.java | 50 ++++ .../store/impl/service/model/BaseResponse.java | 69 ++++++ .../impl/service/model/LoadDataRequest.java | 60 +++++ .../store/impl/service/model/PruneRequest.java | 64 +++++ .../store/impl/service/model/PruneResponse.java | 67 ++++++ .../service/model/RegisterWorkerRequest.java | 73 ++++++ .../service/model/RegisterWorkerResponse.java | 54 +++++ .../store/impl/service/model/ScanRequest.java | 108 +++++++++ .../store/impl/service/model/ScanResponse.java | 73 ++++++ .../impl/service/model/ShutdownRequest.java | 53 +++++ .../impl/service/model/ShutdownResponse.java | 61 +++++ .../store/impl/worker/DataServiceImpl.java | 174 ++++++++++++++ .../store/impl/worker/RequestHandler.java | 166 ------------- .../store/impl/worker/StoreServiceImpl.java | 77 ------ .../carbondata/store/impl/worker/Worker.java | 26 ++- .../apache/carbondata/store/util/StoreUtil.java | 134 ----------- .../store/DistributedCarbonStoreTest.java | 59 +++-- .../carbondata/store/LocalCarbonStoreTest.java | 39 ++-- .../org/apache/carbondata/store/TestUtil.java | 10 - .../horizon/rest/client/HorizonClient.java | 10 +- .../rest/client/impl/SimpleHorizonClient.java | 12 +- .../horizon/rest/controller/Horizon.java | 3 +- .../rest/controller/HorizonController.java | 68 +++--- .../rest/model/validate/RequestValidator.java | 38 +-- .../rest/model/view/CreateTableRequest.java | 4 +- .../horizon/rest/model/view/FieldRequest.java | 2 +- .../horizon/rest/model/view/LoadRequest.java | 4 +- .../horizon/rest/model/view/SelectResponse.java | 11 +- .../apache/carbondata/horizon/HorizonTest.java | 10 +- store/sdk/pom.xml | 4 +- .../carbondata/sdk/file/AvroCarbonWriter.java | 50 ++-- .../sdk/file/CarbonWriterBuilder.java | 4 +- .../org/apache/carbondata/sdk/file/Field.java | 15 +- .../org/apache/carbondata/sdk/file/Schema.java | 3 +- .../carbondata/sdk/store/CarbonStore.java | 151 ++++++++++++ .../sdk/store/CarbonStoreFactory.java | 93 ++++++++ .../sdk/store/DistributedCarbonStore.java | 129 +++++++++++ .../apache/carbondata/sdk/store/KeyedRow.java | 45 ++++ .../apache/carbondata/sdk/store/PrimaryKey.java | 26 +++ .../org/apache/carbondata/sdk/store/Row.java | 27 +++ .../carbondata/sdk/store/conf/StoreConf.java | 207 +++++++++++++++++ .../sdk/store/descriptor/LoadDescriptor.java | 149 ++++++++++++ .../sdk/store/descriptor/ScanDescriptor.java | 151 ++++++++++++ .../sdk/store/descriptor/TableDescriptor.java | 214 +++++++++++++++++ .../sdk/store/descriptor/TableIdentifier.java | 63 +++++ .../sdk/store/exception/CarbonException.java | 38 +++ .../exception/ExecutionTimeoutException.java | 27 +++ .../sdk/store/exception/SchedulerException.java | 31 +++ .../sdk/store/service/ServiceFactory.java | 41 ++++ .../sdk/store/service/StoreService.java | 53 +++++ .../carbondata/sdk/store/util/StoreUtil.java | 134 +++++++++++ .../rest/controller/SqlHorizonController.java | 8 +- .../rest/model/validate/RequestValidator.java | 8 +- 126 files changed, 4912 insertions(+), 3015 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java index 5435028..afd863f 100644 --- a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java +++ b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java @@ -42,7 +42,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience.*; * </ul> */ @InterfaceAudience.User -@org.apache.hadoop.classification.InterfaceStability.Evolving +@InterfaceStability.Evolving public class InterfaceStability { /** * Can evolve while retaining compatibility for minor release boundaries.; http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java index 48775d4..1f1f087 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java @@ -17,13 +17,21 @@ package org.apache.carbondata.core.datastore.row; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import org.apache.carbondata.core.metadata.schema.table.Writable; +import org.apache.carbondata.core.util.ObjectSerializationUtil; + +import org.apache.hadoop.io.WritableUtils; + /** * This row class is used to transfer the row data from one step to other step */ -public class CarbonRow implements Serializable { +public class CarbonRow implements Serializable, Writable { private Object[] data; @@ -87,4 +95,24 @@ public class CarbonRow implements Serializable { public void clearData() { this.data = null; } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(data)); + WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rawData)); + out.writeShort(rangeId); + } + + @Override + public void readFields(DataInput in) throws IOException { + try { + data = (Object[]) ObjectSerializationUtil.deserialize( + WritableUtils.readCompressedByteArray(in)); + rawData = (Object[]) ObjectSerializationUtil.deserialize( + WritableUtils.readCompressedByteArray(in)); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + rangeId = in.readShort(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index 46328f7..abe1810 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -44,7 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTable * Store the information about the table. * it stores the fact table as well as aggregate table present in the schema */ -public class TableInfo implements Serializable, Writable { +public class TableInfo implements Serializable, Writable, org.apache.hadoop.io.Writable { private static final LogService LOGGER = LogServiceFactory.getLogService(TableInfo.class.getName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 405ff53..dd3e63f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.internal.index.Block; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -56,6 +57,11 @@ public class CarbonInputSplit extends FileSplit private Segment segment; + // We use this filePath to store the block location instead of the + // filePath in FileSplit, because filePath in FileSplit is not Serializable + // before Hadoop 3, see HADOOP-13519 + private String filePath; + private String bucketId; private String blockletId; @@ -98,6 +104,7 @@ public class CarbonInputSplit extends FileSplit numberOfBlocklets = 0; invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); + filePath = ""; } private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, @@ -116,6 +123,7 @@ public class CarbonInputSplit extends FileSplit this.version = version; this.deleteDeltaFiles = deleteDeltaFiles; this.dataMapWritePath = dataMapWritePath; + this.filePath = path.toString(); } public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length, @@ -136,6 +144,7 @@ public class CarbonInputSplit extends FileSplit numberOfBlocklets = 0; invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); + filePath = path.toString(); } public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations, @@ -149,6 +158,7 @@ public class CarbonInputSplit extends FileSplit numberOfBlocklets = 0; invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); + filePath = path.toString(); } /** @@ -252,10 +262,24 @@ public class CarbonInputSplit extends FileSplit if (dataMapWriterPathExists) { dataMapWritePath = in.readUTF(); } + boolean filePathExists = in.readBoolean(); + if (filePathExists) { + filePath = in.readUTF(); + } else { + filePath = super.getPath().toString(); + } } @Override public void write(DataOutput out) throws IOException { - super.write(out); + if (super.getPath() != null) { + super.write(out); + } else { + // see HADOOP-13519, after Java deserialization, super.filePath is + // null, so write our filePath instead + Text.writeString(out, filePath); + out.writeLong(getStart()); + out.writeLong(getLength()); + } out.writeUTF(segment.toString()); out.writeShort(version.number()); out.writeUTF(bucketId); @@ -278,6 +302,10 @@ public class CarbonInputSplit extends FileSplit if (dataMapWritePath != null) { out.writeUTF(dataMapWritePath); } + out.writeBoolean(filePath != null); + if (filePath != null) { + out.writeUTF(filePath); + } } public List<String> getInvalidSegments() { @@ -398,7 +426,7 @@ public class CarbonInputSplit extends FileSplit } @Override public String getBlockPath() { - return getPath().getName(); + return filePath.substring(filePath.lastIndexOf("/") + 1); } @Override public List<Long> getMatchedBlocklets() { @@ -444,4 +472,16 @@ public class CarbonInputSplit extends FileSplit public Blocklet makeBlocklet() { return new Blocklet(getPath().getName(), blockletId); } + + public String[] preferredLocations() { + if (CarbonProperties.isTaskLocality()) { + try { + return getLocations(); + } catch (IOException e) { + return new String[0]; + } + } else { + return new String[0]; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 70c530f..ce0dc72 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -185,9 +185,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * It sets unresolved filter expression. * - * @param configuration - * @para DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); -m filterExpression + * @param configuration Hadoop conf + * @param filterExpression filter expression */ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { if (filterExpression == null) { @@ -245,6 +244,17 @@ m filterExpression return configuration.get(COLUMN_PROJECTION); } + public static String[] getProjectionColumns(Configuration configuration) { + String projectionString = getColumnProjection(configuration); + String[] projectColumns; + if (projectionString != null) { + projectColumns = projectionString.split(","); + } else { + projectColumns = new String[]{}; + } + return projectColumns; + } + public static void setFgDataMapPruning(Configuration configuration, boolean enable) { configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable)); } @@ -353,7 +363,7 @@ m filterExpression */ @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException; - protected Expression getFilterPredicates(Configuration configuration) { + public static Expression getFilterPredicates(Configuration configuration) { try { String filterExprString = configuration.get(FILTER_PREDICATE); if (filterExprString == null) { @@ -524,7 +534,7 @@ m filterExpression return prunedBlocklets; } - private List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job, + public List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job, CarbonTable carbonTable, FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException { ExplainCollector.addPruningInfo(carbonTable.getTableName()); @@ -664,13 +674,7 @@ m filterExpression CarbonTable carbonTable = getOrCreateCarbonTable(configuration); // set projection column in the query model - String projectionString = getColumnProjection(configuration); - String[] projectColumns; - if (projectionString != null) { - projectColumns = projectionString.split(","); - } else { - projectColumns = new String[]{}; - } + String[] projectColumns = getProjectionColumns(configuration); QueryModel queryModel = new QueryModelBuilder(carbonTable) .projectColumns(projectColumns) .filterExpression(getFilterPredicates(configuration)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 84e36e3..267ba0f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -71,7 +71,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.JobContextImpl; /** * InputFormat for reading carbondata files with table level metadata support, @@ -98,6 +100,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { private CarbonTable carbonTable; private ReadCommittedScope readCommittedScope; + public CarbonTableInputFormat() { + } + + public CarbonTableInputFormat(Configuration conf) throws IOException { + this.carbonTable = getOrCreateCarbonTable(conf); + this.readCommittedScope = getReadCommitted( + new JobContextImpl(conf, new JobID()), carbonTable.getAbsoluteTableIdentifier()); + } + /** * Get the cached CarbonTable or create it by TableInfo in `configuration` */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java index c126e95..78a2e7f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java @@ -17,6 +17,7 @@ package org.apache.carbondata.hadoop.readsupport; import java.io.IOException; +import java.io.Serializable; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -24,7 +25,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; /** * This is the interface to convert data reading from RecordReader to row representation. */ -public interface CarbonReadSupport<T> { +public interface CarbonReadSupport<T> extends Serializable { /** * Initialization if needed based on the projected column list http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala index 7fff2e5..c624702 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.sdk.store.CarbonStoreFactory +import org.apache.carbondata.sdk.store.conf.StoreConf import org.apache.carbondata.spark.util.Util -import org.apache.carbondata.store.api.CarbonStoreFactory -import org.apache.carbondata.store.api.conf.StoreConf import org.apache.carbondata.store.impl.worker.Worker /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 6c13955..3fccfef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -22,9 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ -import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} @@ -35,18 +33,16 @@ import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.profiler.{Profiler, SQLStart} -import org.apache.spark.util.{CarbonReflectionUtils, Utils} +import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.sdk.store.{CarbonStore, CarbonStoreFactory} +import org.apache.carbondata.sdk.store.conf.StoreConf +import org.apache.carbondata.sdk.store.descriptor.{ScanDescriptor, TableIdentifier} import org.apache.carbondata.store.WorkerManager -import org.apache.carbondata.store.api.{CarbonStore, CarbonStoreFactory} -import org.apache.carbondata.store.api.conf.StoreConf -import org.apache.carbondata.store.api.descriptor.{SelectDescriptor, TableIdentifier => CTableIdentifier} -import org.apache.carbondata.streaming.CarbonStreamingQueryListener /** * Session implementation for {org.apache.spark.sql.SparkSession} @@ -204,7 +200,7 @@ class CarbonSession(@transient val sc: SparkContext, val storeConf = new StoreConf() storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath) storeConf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress) - storeConf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort) + storeConf.conf(StoreConf.REGISTRY_PORT, CarbonProperties.getSearchMasterPort) storeConf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress) storeConf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort) storeConf.conf(StoreConf.WORKER_CORE_NUM, 2) @@ -241,13 +237,13 @@ class CarbonSession(@transient val sc: SparkContext, maxRows: Option[Long] = None, localMaxRows: Option[Long] = None): DataFrame = { val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - val select = new SelectDescriptor( - new CTableIdentifier(table.getTableName, table.getDatabaseName), + val select = new ScanDescriptor( + new TableIdentifier(table.getTableName, table.getDatabaseName), columns.map(_.name).toArray, if (expr != null) CarbonFilters.transformExpression(expr) else null, localMaxRows.getOrElse(Long.MaxValue) ) - val rows = store.select(select).iterator() + val rows = store.scan(select).iterator() val output = new java.util.ArrayList[Row]() val maxRowCount = maxRows.getOrElse(Long.MaxValue) var rowCount = 0 http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java ---------------------------------------------------------------------- diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java index 4b169f4..1f1ce65 100644 --- a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java +++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java @@ -76,7 +76,7 @@ public class TestCarbonResponse { " \"timestamp\": 1531884083849,\n" + " \"status\": 500,\n" + " \"error\": \"Internal Server Error\",\n" + - " \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n" + + " \"exception\": \"org.apache.carbondata.store.api.exception.CarbonException\",\n" + " \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " + "Table or view 'sinka6' already exists in database 'default';\",\n" + " \"path\": \"/table/sql\"\n" + @@ -84,7 +84,7 @@ public class TestCarbonResponse { CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get(); assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " + "Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage()); - assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException()); + assertEquals("org.apache.carbondata.store.api.exception.CarbonException", errorResponse.getException()); assertEquals(1531884083849L, errorResponse.getTimestamp()); assertEquals("Internal Server Error", errorResponse.getError()); assertEquals(500, errorResponse.getStatus()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/conf/store.conf ---------------------------------------------------------------------- diff --git a/store/conf/store.conf b/store/conf/store.conf index 7f18076..9902061 100644 --- a/store/conf/store.conf +++ b/store/conf/store.conf @@ -6,5 +6,7 @@ carbon.store.temp.location=/tmp/carbon.store.temp # worker and master carbon.master.host=127.0.0.1 -carbon.master.port=10020 +carbon.master.registry.port=10020 +carbon.master.prune.port=10120 +carbon.master.store.port=9020 carbon.store.location=/tmp/carbon.store \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/pom.xml ---------------------------------------------------------------------- diff --git a/store/core/pom.xml b/store/core/pom.xml index 44d5ab1..c9e498d 100644 --- a/store/core/pom.xml +++ b/store/core/pom.xml @@ -48,8 +48,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java deleted file mode 100644 index 3525389..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api; - -import java.io.Closeable; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; - -/** - * Public Interface of CarbonStore - */ -@InterfaceAudience.User -@InterfaceStability.Unstable -public interface CarbonStore extends MetaStore, DataStore, Closeable { - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java deleted file mode 100644 index 76ef450..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.carbondata.store.api.conf.StoreConf; -import org.apache.carbondata.store.api.exception.StoreException; - -public class CarbonStoreFactory { - private static Map<String, CarbonStore> distributedStores = new ConcurrentHashMap<>(); - private static Map<String, CarbonStore> localStores = new ConcurrentHashMap<>(); - - private CarbonStoreFactory() { - } - - public static CarbonStore getDistributedStore(String storeName, StoreConf storeConf) - throws StoreException { - if (distributedStores.containsKey(storeName)) { - return distributedStores.get(storeName); - } - - // create a new instance - try { - String className = "org.apache.carbondata.store.impl.DistributedCarbonStore"; - CarbonStore store = createCarbonStore(storeConf, className); - distributedStores.put(storeName, store); - return store; - } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | - InstantiationException e) { - throw new StoreException(e); - } - } - - public static void removeDistributedStore(String storeName) throws IOException { - if (distributedStores.containsKey(storeName)) { - distributedStores.get(storeName).close(); - distributedStores.remove(storeName); - } - } - - public static CarbonStore getLocalStore(String storeName, StoreConf storeConf) - throws StoreException { - if (localStores.containsKey(storeName)) { - return localStores.get(storeName); - } - - // create a new instance - try { - String className = "org.apache.carbondata.store.impl.LocalCarbonStore"; - CarbonStore store = createCarbonStore(storeConf, className); - localStores.put(storeName, store); - return store; - } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException | - InstantiationException e) { - throw new StoreException(e); - } - } - - public static void removeLocalStore(String storeName) throws IOException { - if (localStores.containsKey(storeName)) { - localStores.get(storeName).close(); - localStores.remove(storeName); - } - } - - private static CarbonStore createCarbonStore(StoreConf storeConf, String className) - throws ClassNotFoundException, InstantiationException, IllegalAccessException, - InvocationTargetException { - Constructor[] constructor = Class.forName(className).getDeclaredConstructors(); - constructor[0].setAccessible(true); - return (CarbonStore) constructor[0].newInstance(storeConf); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java deleted file mode 100644 index d35c133..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api; - -import java.io.IOException; -import java.util.List; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.store.api.descriptor.LoadDescriptor; -import org.apache.carbondata.store.api.descriptor.SelectDescriptor; -import org.apache.carbondata.store.api.exception.StoreException; - -/** - * Public interface to write and read data in CarbonStore - */ -@InterfaceAudience.User -@InterfaceStability.Unstable -public interface DataStore { - - /** - * Load data into a Table - * @param load descriptor for load operation - * @throws IOException if network or disk IO error occurs - */ - void loadData(LoadDescriptor load) throws IOException, StoreException; - - /** - * Scan a Table and return matched rows - * @param select descriptor for scan operation, including required column, filter, etc - * @return matched rows - * @throws IOException if network or disk IO error occurs - */ - List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java deleted file mode 100644 index dea6873..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api; - -import java.io.IOException; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.store.api.descriptor.TableDescriptor; -import org.apache.carbondata.store.api.descriptor.TableIdentifier; -import org.apache.carbondata.store.api.exception.StoreException; - -/** - * Public interface to manage table in CarbonStore - */ -@InterfaceAudience.User -@InterfaceStability.Unstable -public interface MetaStore { - /** - * Create a Table - * @param table descriptor for create table operation - * @throws IOException if network or disk IO error occurs - */ - void createTable(TableDescriptor table) throws IOException, StoreException; - - /** - * Drop a Table, and remove all data in it - * @param table table identifier - * @throws IOException if network or disk IO error occurs - */ - void dropTable(TableIdentifier table) throws IOException; - - CarbonTable getTable(TableIdentifier table) throws IOException; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java deleted file mode 100644 index 3f52eed..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api; - -import java.io.IOException; -import java.util.List; - -import org.apache.carbondata.core.datastore.row.CarbonRow; - -public interface SqlStore { - - /** - * Executor a SQL statement - * @param sqlString SQL statement - * @return matched rows - * @throws IOException if network or disk IO error occurs - */ - List<CarbonRow> sql(String sqlString) throws IOException; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java deleted file mode 100644 index 5e4bb4a..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.conf; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.store.util.StoreUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -public class StoreConf implements Serializable, Writable { - - public static final String SELECT_PROJECTION = "carbon.select.projection"; - public static final String SELECT_FILTER = "carbon.select.filter"; - public static final String SELECT_LIMIT = "carbon.select.limit"; - - public static final String SELECT_ID = "carbon.select.id"; - - public static final String WORKER_HOST = "carbon.worker.host"; - public static final String WORKER_PORT = "carbon.worker.port"; - public static final String WORKER_CORE_NUM = "carbon.worker.core.num"; - public static final String MASTER_HOST = "carbon.master.host"; - public static final String MASTER_PORT = "carbon.master.port"; - - public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location"; - public static final String STORE_LOCATION = "carbon.store.location"; - public static final String STORE_NAME = "carbon.store.name"; - - public static final String STORE_CONF_FILE = "carbon.store.confFile"; - - private Map<String, String> conf = new HashMap<>(); - - public StoreConf() { - String storeConfFile = System.getProperty(STORE_CONF_FILE); - if (storeConfFile != null) { - load(storeConfFile); - } - } - - public StoreConf(String storeName, String storeLocation) { - conf.put(STORE_NAME, storeName); - conf.put(STORE_LOCATION, storeLocation); - } - - public StoreConf(String confFilePath) { - load(confFilePath); - } - - public StoreConf conf(String key, String value) { - conf.put(key, value); - return this; - } - - public StoreConf conf(String key, int value) { - conf.put(key, "" + value); - return this; - } - - public void load(String filePath) { - StoreUtil.loadProperties(filePath, this); - } - - public void conf(StoreConf conf) { - this.conf.putAll(conf.conf); - } - - public Object conf(String key) { - return conf.get(key); - } - - public String[] projection() { - return stringArrayValue(SELECT_PROJECTION); - } - - public String filter() { - return stringValue(SELECT_FILTER); - } - - public int limit() { - return intValue(SELECT_LIMIT); - } - - public String masterHost() { - return stringValue(MASTER_HOST); - } - - public int masterPort() { - return intValue(MASTER_PORT); - } - - public String workerHost() { - return stringValue(WORKER_HOST); - } - - public int workerPort() { - return intValue(WORKER_PORT); - } - - public int workerCoreNum() { - return intValue(WORKER_CORE_NUM); - } - - public String storeLocation() { - return stringValue(STORE_LOCATION); - } - - public String[] storeTempLocation() { - return stringArrayValue(STORE_TEMP_LOCATION); - } - - public String selectId() { - return stringValue(SELECT_ID); - } - - public Configuration newHadoopConf() { - Configuration hadoopConf = FileFactory.getConfiguration(); - for (Map.Entry<String, String> entry : conf.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key != null && value != null && key.startsWith("carbon.hadoop.")) { - hadoopConf.set(key.substring("carbon.hadoop.".length()), value); - } - } - return hadoopConf; - } - - private String stringValue(String key) { - Object obj = conf.get(key); - if (obj == null) { - return null; - } - return obj.toString(); - } - - private int intValue(String key) { - String value = conf.get(key); - if (value == null) { - return -1; - } - return Integer.parseInt(value); - } - - private String[] stringArrayValue(String key) { - String value = conf.get(key); - if (value == null) { - return null; - } - return value.split(",", -1); - } - - @Override public void write(DataOutput out) throws IOException { - Set<Map.Entry<String, String>> entries = conf.entrySet(); - WritableUtils.writeVInt(out, conf.size()); - for (Map.Entry<String, String> entry : entries) { - WritableUtils.writeString(out, entry.getKey()); - WritableUtils.writeString(out, entry.getValue()); - } - } - - @Override public void readFields(DataInput in) throws IOException { - if (conf == null) { - conf = new HashMap<>(); - } - - int size = WritableUtils.readVInt(in); - String key, value; - for (int i = 0; i < size; i++) { - key = WritableUtils.readString(in); - value = WritableUtils.readString(in); - conf.put(key, value); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java deleted file mode 100644 index c3a4ff7..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.descriptor; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -public class LoadDescriptor { - - private TableIdentifier table; - private String inputPath; - private Map<String, String> options; - private boolean isOverwrite; - - private LoadDescriptor() { - } - - public LoadDescriptor(TableIdentifier table, String inputPath, - Map<String, String> options, boolean isOverwrite) { - Objects.requireNonNull(table); - Objects.requireNonNull(inputPath); - this.table = table; - this.inputPath = inputPath; - this.options = options; - this.isOverwrite = isOverwrite; - } - - public TableIdentifier getTable() { - return table; - } - - public void setTable(TableIdentifier table) { - this.table = table; - } - - public String getInputPath() { - return inputPath; - } - - public void setInputPath(String inputPath) { - this.inputPath = inputPath; - } - - public Map<String, String> getOptions() { - return options; - } - - public void setOptions(Map<String, String> options) { - this.options = options; - } - - public boolean isOverwrite() { - return isOverwrite; - } - - public void setOverwrite(boolean overwrite) { - isOverwrite = overwrite; - } - - public static class Builder { - private LoadDescriptor load; - private Map<String, String> options; - - private Builder() { - load = new LoadDescriptor(); - options = new HashMap<>(); - } - - public Builder table(TableIdentifier tableIdentifier) { - load.setTable(tableIdentifier); - return this; - } - - public Builder overwrite(boolean isOverwrite) { - load.setOverwrite(isOverwrite); - return this; - } - - public Builder inputPath(String inputPath) { - load.setInputPath(inputPath); - return this; - } - - public Builder options(String key, String value) { - options.put(key, value); - return this; - } - - public LoadDescriptor create() { - load.setOptions(options); - return load; - } - } - - public static Builder builder() { - return new Builder(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java deleted file mode 100644 index c3627a9..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.descriptor; - -import java.util.Objects; - -import org.apache.carbondata.core.scan.expression.Expression; - -public class SelectDescriptor { - - private TableIdentifier table; - private String[] projection; - private Expression filter; - private long limit; - - private SelectDescriptor() { - } - - public SelectDescriptor(TableIdentifier table, String[] projection, - Expression filter, long limit) { - Objects.requireNonNull(table); - Objects.requireNonNull(projection); - this.table = table; - this.projection = projection; - this.filter = filter; - this.limit = limit; - } - - public TableIdentifier getTable() { - return table; - } - - public void setTable(TableIdentifier table) { - this.table = table; - } - - public String[] getProjection() { - return projection; - } - - public void setProjection(String[] projection) { - this.projection = projection; - } - - public Expression getFilter() { - return filter; - } - - public void setFilter(Expression filter) { - this.filter = filter; - } - - public long getLimit() { - return limit; - } - - public void setLimit(long limit) { - this.limit = limit; - } - - public static class Builder { - private SelectDescriptor select; - - private Builder() { - select = new SelectDescriptor(); - } - - public Builder table(TableIdentifier tableIdentifier) { - select.setTable(tableIdentifier); - return this; - } - - public Builder select(String... columnNames) { - select.setProjection(columnNames); - return this; - } - - public Builder filter(Expression filter) { - select.setFilter(filter); - return this; - } - - public Builder limit(long limit) { - select.setLimit(limit); - return this; - } - - public SelectDescriptor create() { - return select; - } - } - - public static Builder builder() { - return new Builder(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java deleted file mode 100644 index 2d677a8..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.descriptor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.sdk.file.Field; -import org.apache.carbondata.sdk.file.Schema; - -public class TableDescriptor { - - private boolean ifNotExists; - private TableIdentifier table; - private String tablePath; - private Schema schema; - private Map<String, String> properties; - private String comment; - - private TableDescriptor() { - } - - public TableDescriptor(TableIdentifier table, Schema schema, - Map<String, String> properties, String tablePath, String comment, boolean ifNotExists) { - Objects.requireNonNull(table); - Objects.requireNonNull(schema); - this.table = table; - this.ifNotExists = ifNotExists; - this.schema = schema; - this.properties = properties; - this.tablePath = tablePath; - this.comment = comment; - } - - public boolean isIfNotExists() { - return ifNotExists; - } - - public void setIfNotExists(boolean ifNotExists) { - this.ifNotExists = ifNotExists; - } - - public TableIdentifier getTable() { - return table; - } - - public void setTable(TableIdentifier table) { - this.table = table; - } - - public Schema getSchema() { - return schema; - } - - public void setSchema(Schema schema) { - this.schema = schema; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - public String getComment() { - return comment; - } - - public void setComment(String comment) { - this.comment = comment; - } - - public void setTablePath(String tablePath) { - this.tablePath = tablePath; - } - - public String getTablePath() { - return tablePath; - } - - public static class Builder { - - private TableDescriptor table; - private List<Field> fields; - private Map<String, String> tblProperties; - - private Builder() { - table = new TableDescriptor(); - fields = new ArrayList<>(); - tblProperties = new HashMap<>(); - } - - public Builder ifNotExists() { - table.setIfNotExists(true); - return this; - } - - public Builder table(TableIdentifier tableId) { - table.setTable(tableId); - return this; - } - - public Builder tablePath(String tablePath) { - table.setTablePath(tablePath); - return this; - } - - public Builder comment(String tableComment) { - table.setComment(tableComment); - return this; - } - - public Builder column(String name, DataType dataType) { - fields.add(new Field(name, dataType)); - return this; - } - - public Builder column(String name, DataType dataType, String comment) { - Field field = new Field(name, dataType); - field.setColumnComment(comment); - fields.add(field); - return this; - } - - public Builder column(String name, DataType dataType, int precision, int scale, String comment) - { - Field field = new Field(name, dataType); - field.setColumnComment(comment); - field.setScale(scale); - field.setPrecision(precision); - fields.add(field); - return this; - } - - public Builder tblProperties(String key, String value) { - tblProperties.put(key, value); - return this; - } - - public TableDescriptor create() { - Field[] fieldArray = new Field[fields.size()]; - fieldArray = fields.toArray(fieldArray); - Schema schema = new Schema(fieldArray); - table.setSchema(schema); - table.setProperties(tblProperties); - return table; - } - } - - public static Builder builder() { - return new Builder(); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java deleted file mode 100644 index ab8edf8..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.descriptor; - -public class TableIdentifier { - private String tableName; - private String databaseName; - - public TableIdentifier(String tableName, String databaseName) { - this.tableName = tableName; - this.databaseName = databaseName; - } - - public String getTableName() { - return tableName; - } - - public String getDatabaseName() { - return databaseName; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java deleted file mode 100644 index 728837d..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.exception; - -public class ExecutionTimeoutException extends RuntimeException { - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java deleted file mode 100644 index 28b8a50..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.exception; - -public class SchedulerException extends RuntimeException { - - public SchedulerException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java deleted file mode 100644 index 315a09b..0000000 --- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store.api.exception; - -public class StoreException extends Exception { - - public StoreException() { - super(); - } - - public StoreException(String message) { - super(message); - } - - public StoreException(Exception e) { - super(e); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java new file mode 100644 index 0000000..4b79ee4 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.sdk.store.Row; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +/** + * A Loader is used to load data from files to the table + */ +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public interface DataLoader extends TransactionalOperation, Serializable { + /** + * Trigger the load operation + * @throws CarbonException if any error occurs + */ + void load() throws CarbonException; + + /** + * Append a batch of rows. + * @param rows rows to append + * @param schema schema of the input row + * @throws CarbonException if any error occurs + */ + void append(Iterator<Row> rows, StructType schema) throws CarbonException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java new file mode 100644 index 0000000..c6c5628 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public interface DataScanner<T> extends Serializable { + + /** + * Perform a scan in a distributed compute framework like Spark, Presto, etc. + * Filter/Projection/Limit operation is pushed down to the scan. + * + * This should be used with {@link Pruner#prune(TableIdentifier, Expression)} + * in a distributed compute environment. It enables the framework to + * do a parallel scan by creating multiple {@link ScanUnit} and perform + * parallel scan in worker, such as Spark executor + * + * The return result is in batch so that the caller can start next + * level of computation before getting all results, such as + * implementing a `prefetch` execution model. + * + * @param input one scan unit + * @return scan result, the result is returned in batch + * @throws CarbonException if any error occurs + */ + Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java new file mode 100644 index 0000000..01c2008 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.sdk.store.CarbonStore; +import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor; +import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +/** + * Internal API for engine integration developers + */ +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public interface InternalCarbonStore extends CarbonStore { + + /** + * Get CarbonTable object from the store + * + * @param tableIdentifier table identifier + * @return CarbonTable object + * @throws CarbonException if any error occurs + */ + CarbonTable getCarbonTable(TableIdentifier tableIdentifier) throws CarbonException; + + /** + * Return a new Loader that can be used to load data in distributed compute framework + * @param load descriptor for load operation + * @return a new Loader + * @throws CarbonException if any error occurs + */ + DataLoader newLoader(LoadDescriptor load) throws CarbonException; + + /** + * Return a new Scanner that can be used in for parallel scan + * + * @param tableIdentifier table to scan + * @param scanOption options for scan, use {@link ScanOption} for the map key + * @param readSupport read support to convert the row to output object + * @param <T> the target object type contain in {@link ResultBatch} + * @return a new Scanner + * @throws CarbonException if any error occurs + */ + <T> Scanner<T> newScanner( + TableIdentifier tableIdentifier, + ScanDescriptor scanDescriptor, + Map<String, String> scanOption, + CarbonReadSupport<T> readSupport) throws CarbonException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java new file mode 100644 index 0000000..c875aa1 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.sdk.store.conf.StoreConf; +import org.apache.carbondata.store.impl.InternalCarbonStoreImpl; + +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public class InternalCarbonStoreFactory { + + private static final Map<String, InternalCarbonStore> stores = new ConcurrentHashMap<>(); + + public static synchronized InternalCarbonStore getStore(String storeName, StoreConf conf) + throws IOException { + InternalCarbonStore store = stores.getOrDefault(storeName, newStore(conf)); + stores.putIfAbsent(storeName, store); + return store; + } + + private static InternalCarbonStore newStore(StoreConf conf) throws IOException { + return new InternalCarbonStoreImpl(conf); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java new file mode 100644 index 0000000..4a1d2e5 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.sdk.store.descriptor.TableIdentifier; +import org.apache.carbondata.sdk.store.exception.CarbonException; + +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public interface Pruner { + + /** + * Return an array of ScanUnit which will be the input in + * {@link Scanner#scan(ScanUnit)} + * + * Implementation will leverage index to prune using specified + * filter expression + * + * @param table table identifier + * @param filterExpression expression of filter predicate given by user + * @return list of ScanUnit which should be passed to + * {@link Scanner#scan(ScanUnit)} + * @throws CarbonException if any error occurs + */ + List<ScanUnit> prune(TableIdentifier table, Expression filterExpression) throws CarbonException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java new file mode 100644 index 0000000..221e0f6 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public interface ResultBatch<T> { + + /** + * Return true if the result is returned in columnar batch, otherwise is row by row. + * By default, it is columnar batch. + */ + default boolean isColumnar() { + return true; + } + + /** + * Return true if there is more elements in this batch. + */ + boolean hasNext(); + + /** + * Return next item. + * If {@link #isColumnar()} return true, there is only one element in this batch + * which is {@link ColumnarBatch}, otherwise, this batch return row by row, caller + * should call next() until no element left. + */ + T next(); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java new file mode 100644 index 0000000..8e5797d --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.devapi; + +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; + +@InterfaceAudience.Developer("Integration") +@InterfaceStability.Unstable +public class ScanOption { + + /** batch size in number of rows in one {@link ResultBatch} */ + public static final String BATCH_SIZE = "batchSize"; + + /** + * set to true if return in row major object in {@link ResultBatch}, + * otherwise columnar object is returned + */ + public static final String ROW_MAJOR = "rowMajor"; + + /** + * set to true if enable remote prune by RPC call, + * otherwise prune executes in caller's JVM + */ + public static final String REMOTE_PRUNE = "remotePrune"; + + /** + * set to true if enable operator pushdown like scan and load + * otherwise operation executes in caller's JVM + */ + public static final String OP_PUSHDOWN = "operatorPushDown"; + + /** + * Return true if REMOTE_PRUNE is set, default is false + */ + public static boolean isRemotePrune(Map<String, String> options) { + if (options == null) { + return false; + } + return Boolean.valueOf(options.getOrDefault(REMOTE_PRUNE, "false")); + } + + /** + * Return true if REMOTE_PRUNE is set, default is false + */ + public static boolean isOperatorPushdown(Map<String, String> options) { + if (options == null) { + return false; + } + return Boolean.valueOf(options.getOrDefault(OP_PUSHDOWN, "false")); + } +} \ No newline at end of file