[CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort

Problem:
Written_By_APPNAME when added in carbonproperty is not serialized in executor 
with global sort

Solution:
Add Written_by_APPNAME in hadoop conf and in executor side get it from 
configuration and add to carbonproperty

This closes #2928


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2e0153bf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2e0153bf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2e0153bf

Branch: refs/heads/branch-1.5
Commit: 2e0153bfa20b8d263402dbb67a8c020dd4a63ddd
Parents: 6df965b
Author: Indhumathi27 <indhumathi...@gmail.com>
Authored: Fri Nov 16 21:49:16 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Nov 21 22:43:46 2018 +0530

----------------------------------------------------------------------
 .../spark/load/DataLoadProcessBuilderOnSpark.scala        |  5 ++---
 .../spark/load/DataLoadProcessorStepOnSpark.scala         |  6 +++++-
 .../store/writer/v3/CarbonFactDataWriterImplV3.java       | 10 +++++++---
 3 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 338180d..8ded6bd 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.appName)
+    hadoopConf
+      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0a68fb0..2ca47b3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, 
BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, 
TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark {
       modelBroadcast: Broadcast[CarbonLoadModel],
       rowCounter: Accumulator[Int],
       conf: Configuration) {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME))
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
     var model: CarbonLoadModel = null
     var tableName: String = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index f168796..ccbc544 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -104,9 +104,13 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter {
           .convertFileFooterVersion3(blockletMetadata, blockletIndex, 
localCardinality,
               thriftColumnSchemaList.size());
       convertFileMeta.setIs_sort(isSorted);
-      
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
-          CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME));
+      String appName = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+      if (appName == null) {
+        throw new CarbonDataWriterException(
+            "DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null");
+      }
+      
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
 appName);
       
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
           CarbonVersionConstants.CARBONDATA_VERSION);
       // fill the carbon index details

Reply via email to