[CARBONDATA-2990] Queries slow down after some time due to broadcast issue

Problem
It is observed that during consecutive run of queries after some time queries 
are slowing down. This is causing the degrade in query performance.
No exception is thrown in driver and executor logs but as observed from the 
logs the time to broadcast hadoop conf is increasing after every query run.

Analysis

This is happening because in carbon SerializableConfiguration class is 
overriden from spark. Spark registers this class with Kryo serializer and hence 
the computation using the kryo is fast. The same benefit is not observed in 
carbondata becuase of overriding the class.
Internal Spark sizeEstimator calculates the size of object and there are few 
extra objects in carbondata overriden class because of which the computation 
time is increasing.
Solution
Use the spark class instead of overriding the class in carbondata

This closes #2803


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c7b3399
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c7b3399
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c7b3399

Branch: refs/heads/branch-1.5
Commit: 3c7b33992e06d81fb47d81bf8ccf7884f845b3ff
Parents: 19097f2
Author: manishgupta88 <[email protected]>
Authored: Mon Oct 8 19:38:54 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Tue Oct 9 13:38:51 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CsvRDDHelper.scala    |  4 +--
 .../load/DataLoadProcessBuilderOnSpark.scala    |  6 ++--
 .../load/DataLoadProcessorStepOnSpark.scala     |  6 ++--
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |  4 +--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 33 --------------------
 .../apache/spark/sql/util/SparkSQLUtil.scala    | 21 ++++++++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  4 +--
 .../spark/sql/CarbonDictionaryDecoder.scala     |  8 ++---
 .../management/CarbonLoadDataCommand.scala      |  7 +++--
 .../command/mutation/DeleteExecution.scala      |  7 ++---
 .../command/mutation/HorizontalCompaction.scala |  8 ++---
 11 files changed, 46 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 8d6dd32..5511645 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -41,7 +42,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -110,7 +110,7 @@ object CsvRDDHelper {
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializableConfiguration = 
SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf)
     val readFunction = new (PartitionedFile => Iterator[InternalRow]) with 
Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 2e74a94..923676c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,7 +36,6 @@ import 
org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * Use sortBy operator in spark to load the data
@@ -66,7 +66,7 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
-    val conf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
     val inputRDD = originRDD
       .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, 
modelBroadcast))
@@ -121,7 +121,7 @@ object DataLoadProcessBuilderOnSpark {
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, 
modelBroadcast,
-        writeStepRowCounter, conf))
+        writeStepRowCounter, conf.value.value))
 
     // clean cache only if persisted and keeping unpersist non-blocking as 
non-blocking call will
     // not have any functional impact as spark automatically monitors the 
cache usage on each node

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f17bd91..f1a12bf 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -42,7 +42,7 @@ import 
org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, 
CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, 
SerializableConfiguration, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
@@ -230,8 +230,8 @@ object DataLoadProcessorStepOnSpark {
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
       rowCounter: Accumulator[Int],
-      conf: Broadcast[SerializableConfiguration]) {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+      conf: Configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
     var model: CarbonLoadModel = null
     var tableName: String = null
     var rowConverter: RowConverterImpl = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 3a02f85..83cd59c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, 
TaskContext}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -49,8 +48,7 @@ abstract class CarbonRDD[T: ClassTag](
 
   @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
 
-  val config: Broadcast[SerializableConfiguration] = sparkContext
-    .broadcast(new SerializableConfiguration(hadoopConf))
+  val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient sparkSession: SparkSession, @transient oneParent: 
RDD[_]) =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 87c8e4c..6076e4a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -54,39 +54,6 @@ import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
-class SerializableConfiguration(@transient var value: Configuration) extends 
Serializable {
-
-  @transient
-  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private def writeObject(out: ObjectOutputStream): Unit =
-    try {
-      out.defaultWriteObject()
-      value.write(out)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-
-
-  private def readObject(in: ObjectInputStream): Unit =
-    try {
-      value = new Configuration(false)
-      value.readFields(in)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-}
-
 /**
  * This partition class use to split by Host
  *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index b7d47a0..9ffe6e1 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -19,11 +19,14 @@ package org.apache.spark.sql.util
 
 import java.lang.reflect.Method
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, 
SerializableConfiguration, SparkUtil}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = 
sparkSession.sessionState
@@ -99,4 +102,20 @@ object SparkSQLUtil {
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
+
+  /**
+   * Method to broadcast a variable using spark SerializableConfiguration class
+   *
+   * @param sparkContext
+   * @param hadoopConf
+   * @return
+   */
+  def broadCastHadoopConf(sparkContext: SparkContext,
+      hadoopConf: Configuration): Broadcast[SerializableConfiguration] = {
+    sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf))
+  }
+
+  def getSerializableConfigurableInstance(hadoopConf: Configuration): 
SerializableConfiguration = {
+    new SerializableConfiguration(hadoopConf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6350b50..0ec3bc6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CompactionModel, 
ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -728,7 +728,7 @@ object CarbonDataRDDFactory {
 
       // because partitionId=segmentIdIndex*parallelism+RandomPart and 
RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
-      val conf = sqlContext.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+      val conf = 
SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, 
hadoopConf)
       partitionByRdd.map(_._2).mapPartitions { partition =>
         
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d0ed56e..ff7ac60 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, 
UnaryExecNode}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, 
DictionaryColumnUniqueIdentifier}
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, 
SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
 
 /**
  * It decodes the data.
@@ -76,8 +76,8 @@ case class CarbonDictionaryDecoder(
         (carbonTable.getTableName, carbonTable)
       }.toMap
 
-      val conf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f7a5f42..43c8b86 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -42,6 +42,7 @@ import 
org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSou
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -78,7 +79,7 @@ import 
org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro
 import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, 
DataLoadProcessorStepOnSpark}
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, 
SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
@@ -986,8 +987,8 @@ case class CarbonLoadDataCommand(
           array
         }
       }
-    val conf = sparkSession.sparkContext
-      .broadcast(new 
SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
     val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 4921b33..7e7f671 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -49,7 +50,6 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object DeleteExecution {
   val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getName)
@@ -120,9 +120,8 @@ object DeleteExecution {
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
         keyRdd.partitions.length)
 
-    val conf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(sparkSession
-      .sessionState.newHadoopConf()))
-
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
     val rdd = rowContRdd.join(keyRdd)
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, 
Iterable[Row]))]) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 66066ed..35fc3c3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -26,16 +26,16 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.AlterTableModel
 import 
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
-import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object HorizontalCompaction {
 
@@ -191,8 +191,8 @@ object HorizontalCompaction {
 
       val timestamp = factTimeStamp
       val updateStatusDetails = 
segmentUpdateStatusManager.getUpdateStatusDetails
-      val conf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
       val result = rdd1.mapPartitions(iter =>
         new Iterator[Seq[CarbonDataMergerUtilResult]] {
           
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)

Reply via email to