http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala index d284e50..f421d44 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -113,11 +113,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd //data source file format if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO @@ -162,11 +162,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO @@ -184,6 +184,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd cleanTestData() } + // TODO: Make the sparkCarbonFileFormat to work without index file test("Read sdk writer output file without Carbondata file should fail") { buildTestData(false) deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) @@ -194,11 +195,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd // data source file format if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO @@ -224,11 +225,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd //data source file format if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO @@ -254,11 +255,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO @@ -303,11 +304,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd if (sqlContext.sparkContext.version.startsWith("2.1")) { //data source file format - sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (sqlContext.sparkContext.version.startsWith("2.2")) { //data source file format sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala index 9a46676..de91f2a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala @@ -132,11 +132,11 @@ object TestSparkCarbonFileFormatWithSparkSession { //data source file format if (spark.sparkContext.version.startsWith("2.1")) { //data source file format - spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + spark.sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """) } else if (spark.sparkContext.version.startsWith("2.2")) { //data source file format spark.sql( - s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION |'$filePath' """.stripMargin) } else{ // TO DO http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 4378c15..2ba6e5e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -63,9 +63,9 @@ class CarbonIUDMergerRDD[K, V]( val defaultParallelism = sparkContext.defaultParallelism val noOfBlocks = 0 - CarbonTableInputFormat.setSegmentsToAccess( + CarbonInputFormat.setSegmentsToAccess( job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava) - CarbonTableInputFormat.setTableInfo( + CarbonInputFormat.setTableInfo( job.getConfiguration, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 9ce4904..c9237d1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -48,7 +48,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -280,10 +280,10 @@ class CarbonMergerRDD[K, V]( SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) - CarbonTableInputFormat.setPartitionsToPrune( + CarbonInputFormat.setPartitionsToPrune( job.getConfiguration, carbonMergerMapping.currentPartitions.map(_.asJava).orNull) - CarbonTableInputFormat.setTableInfo(job.getConfiguration, + CarbonInputFormat.setTableInfo(job.getConfiguration, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) var updateDetails: UpdateVO = null // initialise query_id for job http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 452db56..2c7c27f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -45,7 +45,7 @@ import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResult import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonCompactionUtil import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor @@ -97,7 +97,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, val job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier, partitionIds.toList.asJava, job) - CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) + CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) job.getConfiguration.set("query.id", queryId) val splits = format.getSplitsOfOneSegment(job, segmentId, http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 874584d..9d2b6e5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant import org.apache.carbondata.core.statusmanager.FileFormat import org.apache.carbondata.core.util._ import org.apache.carbondata.hadoop._ -import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics @@ -90,7 +90,7 @@ class CarbonScanRDD( val jobConf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jobConf) val job = Job.getInstance(jobConf) - val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal") + val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat") val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) { prepareFileInputFormatForDriver(job.getConfiguration) } else { @@ -436,53 +436,53 @@ class CarbonScanRDD( } def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = { - CarbonTableInputFormat.setTableInfo(conf, tableInfo) - CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) - CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) + CarbonInputFormat.setTableInfo(conf, tableInfo) + CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) + CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) if (partitionNames != null) { - CarbonTableInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) + CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) } createInputFormat(conf) } def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = { - CarbonFileInputFormat.setTableInfo(conf, tableInfo) - CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) - CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) + CarbonInputFormat.setTableInfo(conf, tableInfo) + CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) + CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) if (partitionNames != null) { - CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) + CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) } createFileInputFormat(conf) } - private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = { - CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport) + private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = { + CarbonInputFormat.setCarbonReadSupport(conf, readSupport) val tableInfo1 = getTableInfo - CarbonTableInputFormat.setTableInfo(conf, tableInfo1) - CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) - CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) - CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl) + CarbonInputFormat.setTableInfo(conf, tableInfo1) + CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) + CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) + CarbonInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl) createInputFormat(conf) } private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = { val format = new CarbonFileInputFormat[Object] - CarbonFileInputFormat.setTablePath(conf, + CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath)) - CarbonFileInputFormat.setQuerySegment(conf, identifier) - CarbonFileInputFormat.setFilterPredicates(conf, filterExpression) - CarbonFileInputFormat.setColumnProjection(conf, columnProjection) - CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob) + CarbonInputFormat.setQuerySegment(conf, identifier) + CarbonInputFormat.setFilterPredicates(conf, filterExpression) + CarbonInputFormat.setColumnProjection(conf, columnProjection) + CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) if (CarbonProperties.getInstance().getProperty( CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { - CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) + CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) } // when validate segments is disabled in thread local update it to CarbonTableInputFormat val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { - CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams + CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + identifier.getCarbonTableIdentifier.getDatabaseName + "." + identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean) @@ -493,22 +493,22 @@ class CarbonScanRDD( private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { val format = new CarbonTableInputFormat[Object] - CarbonTableInputFormat.setTablePath(conf, + CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath)) - CarbonTableInputFormat.setQuerySegment(conf, identifier) - CarbonTableInputFormat.setFilterPredicates(conf, filterExpression) - CarbonTableInputFormat.setColumnProjection(conf, columnProjection) - CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) + CarbonInputFormat.setQuerySegment(conf, identifier) + CarbonInputFormat.setFilterPredicates(conf, filterExpression) + CarbonInputFormat.setColumnProjection(conf, columnProjection) + CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) if (CarbonProperties.getInstance().getProperty( CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { - CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) + CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) } // when validate segments is disabled in thread local update it to CarbonTableInputFormat val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { - CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams + CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + identifier.getCarbonTableIdentifier.getDatabaseName + "." + identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 1656efa..48ebdb4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.util.CommonUtil @@ -154,7 +154,7 @@ object PartitionUtils { val job = new Job(jobConf) val format = CarbonInputFormatUtil .createCarbonTableInputFormat(identifier, partitionIds.asJava, job) - CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) + CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) val splits = format.getSplitsOfOneSegment(job, segmentId, oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index c286c50..2d19fd4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} case class CarbonCountStar( attributesRaw: Seq[Attribute], @@ -45,7 +45,7 @@ case class CarbonCountStar( override def doExecute(): RDD[InternalRow] = { val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier) - CarbonTableInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier) + CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier) // get row count val rowCount = CarbonUpdateUtil.getRowCount( http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 10d55ef..8eaeab1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -43,7 +43,7 @@ import org.apache.carbondata.core.mutate.data.RowCountDetailsVO import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.processing.exception.MultipleMatchingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.spark.DeleteDelataResultImpl @@ -90,7 +90,7 @@ object DeleteExecution { } val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier) - CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) + CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) val keyRdd = deleteRdd.map({ row => val tupleId: String = row .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index cf22569..74da11a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -150,7 +150,7 @@ case class CarbonDropTableCommand( // delete table data only if it is not external table if (FileFactory.isFileExist(tablePath, fileType) && - !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) { + !(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) { val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala index fa54e0d..2daece3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala @@ -51,7 +51,7 @@ import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats} -import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob} +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob} import org.apache.carbondata.spark.util.CarbonScalaUtil @InterfaceAudience.User @@ -105,9 +105,9 @@ class SparkCarbonFileFormat extends FileFormat } } - override def shortName(): String = "Carbonfile" + override def shortName(): String = "carbonfile" - override def toString: String = "Carbonfile" + override def toString: String = "carbonfile" override def hashCode(): Int = getClass.hashCode() @@ -179,10 +179,9 @@ class SparkCarbonFileFormat extends FileFormat supportBatchValue = supportBatch(sparkSession, dataSchema) } - CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy") - CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default") + CarbonInputFormat.setTableName(job.getConfiguration, "externaldummy") + CarbonInputFormat.setDatabaseName(job.getConfiguration, "default") CarbonMetadata.getInstance.removeTable("default_externaldummy") - val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration) val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object] (file: PartitionedFile) => { @@ -207,9 +206,9 @@ class SparkCarbonFileFormat extends FileFormat conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy") conf1.set("mapreduce.input.carboninputformat.databaseName", "default") conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath) - CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection) + CarbonInputFormat.setColumnProjection(conf1, carbonProjection) filter match { - case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c) + case Some(c) => CarbonInputFormat.setFilterPredicates(conf1, c) case None => None } val attemptContext = new TaskAttemptContextImpl(conf1, attemptId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index d85ef68..b20349c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -112,7 +112,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (isCarbonTable) { val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName, alterTableChangeDataTypeModel.tableName)(sparkSession) - if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") } else { @@ -128,7 +128,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (isCarbonTable) { val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName, alterTableAddColumnsModel.tableName)(sparkSession) - if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") } else { @@ -144,7 +144,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (isCarbonTable) { val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName, alterTableDropColumnModel.tableName)(sparkSession) - if (carbonTable != null && carbonTable.isFileLevelExternalTable) { + if (carbonTable != null && carbonTable.isFileLevelFormat) { throw new MalformedCarbonCommandException( "Unsupported alter operation on Carbon external fileformat table") } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 69fd366..55eb5ac 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -265,7 +265,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, val table = try { val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) && - provider.equalsIgnoreCase("'Carbonfile'")) { + provider.equalsIgnoreCase("'carbonfile'")) { SchemaReader.inferSchema(identifier) } else { @@ -277,12 +277,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader) } // set "_external" property, so that DROP TABLE will not delete the data - if (provider.equalsIgnoreCase("'Carbonfile'")) { - table.getFactTable.getTableProperties.put("_filelevelexternal", "true") + if (provider.equalsIgnoreCase("'carbonfile'")) { + table.getFactTable.getTableProperties.put("_filelevelformat", "true") table.getFactTable.getTableProperties.put("_external", "false") } else { table.getFactTable.getTableProperties.put("_external", "true") - table.getFactTable.getTableProperties.put("_filelevelexternal", "false") + table.getFactTable.getTableProperties.put("_filelevelformat", "false") } table } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 1f5808a..e8ae67b 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -326,7 +326,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || - fileStorage.equalsIgnoreCase("'Carbonfile'") || + fileStorage.equalsIgnoreCase("'carbonfile'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index c28e4ba..3cb9bd6 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -325,7 +325,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || - fileStorage.equalsIgnoreCase("'Carbonfile'") || + fileStorage.equalsIgnoreCase("'carbonfile'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 0a95c24..4df04b9 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -145,16 +145,16 @@ class StreamHandoffRDD[K, V]( val inputSplit = split.asInstanceOf[HandoffPartition].split.value val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val hadoopConf = new Configuration() - CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) - CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName) - CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) + CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) + CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName) + CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) val projection = new CarbonProjection val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName) (0 until dataFields.size()).foreach { index => projection.addColumn(dataFields.get(index).getColName) } - CarbonTableInputFormat.setColumnProjection(hadoopConf, projection) - CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) + CarbonInputFormat.setColumnProjection(hadoopConf, projection) + CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) val format = new CarbonTableInputFormat[Array[Object]]() val model = format.createQueryModel(inputSplit, attemptContext)
