Repository: carbondata Updated Branches: refs/heads/master 94a4f8314 -> 625844784
[CARBONDATA-3025]handle passing spark appname for partition table and file format Changes in this PR 1.Dataload with partion table file format fails, as the appname is not in carbonproperties in executor. 2.This PR sets the spark appname in carbon properties which will be written to carbondata footer. 3.the appname is set in hadoop conf and then set in carbonproperties in executor from getting the same from the conf instead of hardcoding the spark property, get from exposed API appName This closes #2861 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62584478 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62584478 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62584478 Branch: refs/heads/master Commit: 6258447849f0fb0934b6a0e728d99d62223a2854 Parents: 94a4f83 Author: akashrn5 <akashnilu...@gmail.com> Authored: Fri Oct 26 19:34:04 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Oct 31 20:52:51 2018 +0800 ---------------------------------------------------------------------- .../carbondata/hadoop/api/CarbonTableOutputFormat.java | 6 ++++++ .../spark/load/DataLoadProcessBuilderOnSpark.scala | 2 +- .../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala | 9 ++++----- .../execution/datasources/SparkCarbonFileFormat.scala | 6 +----- .../sql/execution/datasources/SparkCarbonTableFormat.scala | 8 ++++---- 5 files changed, 16 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index f0f2858..0bcd7e1 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -236,6 +236,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter( final TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + String appName = + taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME); + if (null != appName) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName); + } //if loadModel having taskNo already(like in SDK) then no need to overwrite short sdkWriterCores = loadModel.getSdkWriterCores(); int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1; http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index d794636..338180d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -68,7 +68,7 @@ object DataLoadProcessBuilderOnSpark { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.getConf.get("spark.app.name")) + sparkSession.sparkContext.appName) val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) // 1. Input http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index cdfce56..ce08f8f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -37,6 +37,10 @@ abstract class CarbonRDD[T: ClassTag]( @transient private val ss: SparkSession, @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) { + @transient val sparkAppName: String = ss.sparkContext.appName + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName) + val carbonSessionInfo: CarbonSessionInfo = { var info = ThreadLocalSessionInfo.getCarbonSessionInfo if (info == null || info.getSessionParams == null) { @@ -57,11 +61,6 @@ abstract class CarbonRDD[T: ClassTag]( protected def internalGetPartitions: Array[Partition] - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - ss.sparkContext.getConf.get("spark.app.name")) - override def getPartitions: Array[Partition] = { ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf) internalGetPartitions http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 9e3a4c8..cd2035c 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -121,15 +121,11 @@ class SparkCarbonFileFormat extends FileFormat dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - + conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema) model.setLoadWithoutConverterStep(true) CarbonTableOutputFormat.setLoadModel(conf, model) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.getConf.get("spark.app.name")) - new OutputWriterFactory { override def newInstance( path: String, http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index 47d6a71..148d317 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -121,10 +121,6 @@ with Serializable { model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) model.setLoadWithoutConverterStep(true) - carbonProperty - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.getConf.get("spark.app.name")) - val staticPartition = options.getOrElse("staticpartition", null) if (staticPartition != null) { conf.set("carbon.staticpartition", staticPartition) @@ -159,6 +155,7 @@ with Serializable { if (updateTimeStamp.isDefined) { conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) } + conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { @@ -175,6 +172,9 @@ with Serializable { dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) + val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName) val taskNumber = generateTaskNumber(path, context, model.getSegmentId) val storeLocation = CommonUtil.getTempStoreLocations(taskNumber) CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)