Repository: carbondata Updated Branches: refs/heads/master 6e224dce6 -> 4430178c0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala new file mode 100644 index 0000000..99ce7fa --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.util + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.{DataLoadingUtil, Util} + +class CarbonFileFormat + extends FileFormat + with DataSourceRegister + with Logging +with Serializable { + + override def shortName(): String = "carbondata" + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + None + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + conf.setClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[CarbonOutputCommitter], + classOf[CarbonOutputCommitter]) + conf.set("carbon.commit.protocol", "carbon.commit.protocol") + sparkSession.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + + job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) + var table = CarbonEnv.getCarbonTable( + TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) + val model = new CarbonLoadModel + val carbonProperty = CarbonProperties.getInstance() + val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val tableProperties = table.getTableInfo.getFactTable.getTableProperties + optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + val partitionStr = + table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map( + _.getColumnName.toLowerCase).mkString(",") + optionsFinal.put( + "fileheader", + dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr) + DataLoadingUtil.buildCarbonLoadModel( + table, + carbonProperty, + options, + optionsFinal, + model, + conf + ) + model.setPartitionId("0") + model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) + model.setDictionaryServerHost(options.getOrElse("dicthost", null)) + model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) + CarbonTableOutputFormat.setLoadModel(conf, model) + CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) + + new OutputWriterFactory { + + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir + var storeLocation: Array[String] = Array[String]() + val isCarbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") + val tmpLocationSuffix = File.separator + System.nanoTime() + if (isCarbonUseLocalDir) { + val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { + // use single dir + storeLocation = storeLocation :+ + (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix) + if (storeLocation == null || storeLocation.isEmpty) { + storeLocation = storeLocation :+ + (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + } else { + // use all the yarn dirs + storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix) + } + } else { + storeLocation = + storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) + } + CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) + new CarbonOutputWriter(path, context, dataSchema.map(_.dataType)) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".carbondata" + } + + } + } +} + +case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) { + override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, + absoluteDir: String, + ext: String): String = { + val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol") + if (carbonFlow != null) { + super.newTaskTempFile(taskContext, Some(absoluteDir), ext) + } else { + super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) + } + } +} + +/** + * It is a just class to make compile between spark 2.1 and 2.2 + */ +private trait AbstractCarbonOutputWriter { + def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") + def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + def write(row: InternalRow): Unit = { + writeCarbon(row) + } + def writeCarbon(row: InternalRow): Unit +} + +private class CarbonOutputWriter(path: String, + context: TaskAttemptContext, + fieldTypes: Seq[DataType]) + extends OutputWriter with AbstractCarbonOutputWriter { + val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName) + val partitionData = if (partitions.nonEmpty) { + partitions.map(_.split("=")(1)) + } else { + Array.empty + } + val writable = new StringArrayWritable() + + private val recordWriter: CarbonRecordWriter = { + + new CarbonTableOutputFormat() { + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + new Path(path) + } + }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter] + } + + // TODO Implement writesupport interface to support writing Row directly to recordwriter + def writeCarbon(row: InternalRow): Unit = { + val data = new Array[String](fieldTypes.length + partitionData.length) + var i = 0 + while (i < fieldTypes.length) { + if (!row.isNullAt(i)) { + data(i) = row.getString(i) + } + i += 1 + } + if (partitionData.length > 0) { + System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length) + } + writable.set(data) + recordWriter.write(NullWritable.get(), writable) + } + + + override def writeInternal(row: InternalRow): Unit = { + writeCarbon(row) + } + + override def close(): Unit = { + recordWriter.close(context) + val loadModel = recordWriter.getLoadModel + val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId) + // write partition info to new file. + val partitonList = new util.ArrayList[String]() + partitions.foreach(partitonList.add) + new PartitionMapFileStore().writePartitionMapFile( + segmentPath, + loadModel.getTaskNo, + partitonList) + } + + def getPartitionsFromPath(path: String, attemptContext: TaskAttemptContext): Array[String] = { + var attemptId = attemptContext.getTaskAttemptID.toString + "/" + if (path.indexOf(attemptId) <= 0) { + val model = CarbonTableOutputFormat.getLoadModel(attemptContext.getConfiguration) + attemptId = model.getTableName + "/" + } + val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) + if (str.length > 0) { + str.split("/") + } else { + Array.empty + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 8b247f7..7f9bdf7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.format.DataType import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 8745900..21b81ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -74,8 +74,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { CarbonDropTableCommand(ifNotExists, identifier.database, identifier.table.toLowerCase)) :: Nil case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation, - _, child: LogicalPlan, overwrite, _) => - ExecutedCommandExec(CarbonInsertIntoCommand(relation, child, overwrite)) :: Nil + partition, child: LogicalPlan, overwrite, _) => + ExecutedCommandExec(CarbonInsertIntoCommand(relation, child, overwrite, partition)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => FileUtils.createDatabaseDirectory(dbName, CarbonProperties.getStorePath) ExecutedCommandExec(createDb) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index de5fa7e..ff7e06a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -17,18 +17,25 @@ package org.apache.spark.sql.hive +import scala.collection.JavaConverters._ +import scala.collection.mutable + import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand +import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.core.constants.CarbonCommonConstants + case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { private lazy val parser = sparkSession.sessionState.sqlParser @@ -200,3 +207,63 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica } } } + +/** + * Insert into carbon table from other source + */ +case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _) + if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + castChildOutput(p, relation, child) + } + } + + def castChildOutput(p: InsertIntoTable, + relation: LogicalRelation, + child: LogicalPlan): LogicalPlan = { + val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants + .DEFAULT_MAX_NUMBER_OF_COLUMNS) { + CarbonException.analysisException( + s"Maximum number of columns supported: " + + s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}") + } + if (child.output.size >= carbonDSRelation.carbonRelation.output.size || + carbonDSRelation.carbonTable.isHivePartitionTable) { + val newChildOutput = child.output.zipWithIndex.map { columnWithIndex => + columnWithIndex._1 match { + case attr: Alias => + Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId) + case attr: Attribute => + Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId) + case attr => attr + } + } + val version = SPARK_VERSION + val newChild: LogicalPlan = if (newChildOutput == child.output) { + if (version.startsWith("2.1")) { + CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] + } else if (version.startsWith("2.2")) { + CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan] + } else { + throw new UnsupportedOperationException(s"Spark version $version is not supported") + } + } else { + Project(newChildOutput, child) + } + + val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p) + + InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, overwrite, true) + } else { + CarbonException.analysisException( + "Cannot insert into target table because number of columns mismatch") + } + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 623d309..c28cc44 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -1322,59 +1322,3 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { } } } - -/** - * Insert into carbon table from other source - */ -case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - plan.transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _) - if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child) - } - } - - def castChildOutput(p: InsertIntoTable, - relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan): LogicalPlan = { - if (relation.carbonRelation.output.size > CarbonCommonConstants - .DEFAULT_MAX_NUMBER_OF_COLUMNS) { - CarbonException.analysisException("Maximum number of columns supported:" + - s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}") - } - if (child.output.size >= relation.carbonRelation.output.size) { - val newChildOutput = child.output.zipWithIndex.map { columnWithIndex => - columnWithIndex._1 match { - case attr: Alias => - Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId) - case attr: Attribute => - Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId) - case attr => attr - } - } - val version = SPARK_VERSION - val newChild: LogicalPlan = if (newChildOutput == child.output) { - if (version.startsWith("2.1")) { - CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] - } else if (version.startsWith("2.2")) { - CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan] - } else { - throw new UnsupportedOperationException(s"Spark version $version is not supported") - } - } else { - Project(newChildOutput, child) - } - - val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p) - - InsertIntoCarbonTable(relation, p.partition, newChild, overwrite, true) - } else { - CarbonException.analysisException( - "Cannot insert into target table because number of columns mismatch") - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 55d784f..79095ca 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -199,7 +199,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) // validate partition clause if (partitionFields.nonEmpty) { if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) { - throw new MalformedCarbonCommandException("Error: Invalid partition definition") + throw new MalformedCarbonCommandException("Error: Invalid partition definition") } // partition columns should not be part of the schema val badPartCols = partitionFields.map(_.partitionColumn).toSet.intersect(colNames.toSet) @@ -251,7 +251,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) } if (partitionerFields.nonEmpty) { if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) { - throw new MalformedCarbonCommandException("Error: Invalid partition definition") + throw new MalformedCarbonCommandException("Error: Invalid partition definition") } // partition columns should not be part of the schema val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet) http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index abe90f1..0cd1331 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -55,7 +55,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { @Override public String[] next() { - if (readBatch == null || !readBatch.hasNext()) { + if (readBatch == null || !readBatch.hasNext() && !close) { try { readBatch = queue.take(); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java index c32aa51..d3caa99 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -46,16 +46,16 @@ public abstract class AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel) { CarbonDataFileAttributes carbonDataFileAttributes; if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { - int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), + long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier())); // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will // be written in same segment. So the TaskNo should be incremented by 1 from max val. - int index = taskNo + 1; + long index = taskNo + 1; carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp()); } else { carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), + new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()), loadModel.getFactTimeStamp()); } carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 48c5471..68a212e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -51,7 +51,7 @@ public class RowResultProcessor { CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation); CarbonDataFileAttributes carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), + new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()), loadModel.getFactTimeStamp()); carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); carbonFactDataHandlerModel.setBucketId(bucketId); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java index b69815e..8bedd80 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java @@ -25,7 +25,7 @@ public class CarbonDataFileAttributes { /** * task Id which is unique for each spark task */ - private int taskId; + private long taskId; /** * load start time @@ -36,7 +36,7 @@ public class CarbonDataFileAttributes { * @param taskId * @param factTimeStamp */ - public CarbonDataFileAttributes(int taskId, long factTimeStamp) { + public CarbonDataFileAttributes(long taskId, long factTimeStamp) { this.taskId = taskId; this.factTimeStamp = factTimeStamp; } @@ -44,7 +44,7 @@ public class CarbonDataFileAttributes { /** * @return */ - public int getTaskId() { + public long getTaskId() { return taskId; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4430178c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index a8ae513..96bd2e3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -223,7 +223,7 @@ public class CarbonFactDataHandlerModel { } CarbonDataFileAttributes carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(configuration.getTaskNo()), + new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()), (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP)); String carbonDataDirectoryPath = getCarbonDataFolderLocation(configuration);
