Repository: carbondata
Updated Branches:
  refs/heads/master 94a4f8314 -> 625844784


[CARBONDATA-3025]handle passing spark appname for partition table and file 
format

Changes in this PR

1.Dataload with partion table file format fails, as the appname is not in 
carbonproperties in executor.
2.This PR sets the spark appname in carbon properties which will be written to 
carbondata footer.
3.the appname is set in hadoop conf and then set in carbonproperties in 
executor from getting the same from the conf
instead of hardcoding the spark property, get from exposed API appName

This closes #2861


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62584478
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62584478
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62584478

Branch: refs/heads/master
Commit: 6258447849f0fb0934b6a0e728d99d62223a2854
Parents: 94a4f83
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Fri Oct 26 19:34:04 2018 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Oct 31 20:52:51 2018 +0800

----------------------------------------------------------------------
 .../carbondata/hadoop/api/CarbonTableOutputFormat.java      | 6 ++++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala          | 2 +-
 .../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala   | 9 ++++-----
 .../execution/datasources/SparkCarbonFileFormat.scala       | 6 +-----
 .../sql/execution/datasources/SparkCarbonTableFormat.scala  | 8 ++++----
 5 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f0f2858..0bcd7e1 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -236,6 +236,12 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
   public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
       final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = 
getLoadModel(taskAttemptContext.getConfiguration());
+    String appName =
+        
taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+    if (null != appName) {
+      CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
appName);
+    }
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
     short sdkWriterCores = loadModel.getSdkWriterCores();
     int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index d794636..338180d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -68,7 +68,7 @@ object DataLoadProcessBuilderOnSpark {
 
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
+        sparkSession.sparkContext.appName)
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index cdfce56..ce08f8f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -37,6 +37,10 @@ abstract class CarbonRDD[T: ClassTag](
     @transient private val ss: SparkSession,
     @transient private var deps: Seq[Dependency[_]]) extends 
RDD[T](ss.sparkContext, deps) {
 
+  @transient val sparkAppName: String = ss.sparkContext.appName
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName)
+
   val carbonSessionInfo: CarbonSessionInfo = {
     var info = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (info == null || info.getSessionParams == null) {
@@ -57,11 +61,6 @@ abstract class CarbonRDD[T: ClassTag](
 
   protected def internalGetPartitions: Array[Partition]
 
-
-  CarbonProperties.getInstance()
-    .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-      ss.sparkContext.getConf.get("spark.app.name"))
-
   override def getPartitions: Array[Partition] = {
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
     internalGetPartitions

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 9e3a4c8..cd2035c 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -121,15 +121,11 @@ class SparkCarbonFileFormat extends FileFormat
       dataSchema: StructType): OutputWriterFactory = {
 
     val conf = job.getConfiguration
-
+    conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
     val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
     model.setLoadWithoutConverterStep(true)
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
-
     new OutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 47d6a71..148d317 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -121,10 +121,6 @@ with Serializable {
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     model.setLoadWithoutConverterStep(true)
-    carbonProperty
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
-
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {
       conf.set("carbon.staticpartition", staticPartition)
@@ -159,6 +155,7 @@ with Serializable {
     if (updateTimeStamp.isDefined) {
       conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
     }
+    conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {
@@ -175,6 +172,9 @@ with Serializable {
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
         val model = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+        val appName = 
context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
+        CarbonProperties.getInstance().addProperty(
+          CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
         val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
         val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
         
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, 
storeLocation)

Reply via email to