[CARBONDATA-2990] Queries slow down after some time due to broadcast issue Problem It is observed that during consecutive run of queries after some time queries are slowing down. This is causing the degrade in query performance. No exception is thrown in driver and executor logs but as observed from the logs the time to broadcast hadoop conf is increasing after every query run.
Analysis This is happening because in carbon SerializableConfiguration class is overriden from spark. Spark registers this class with Kryo serializer and hence the computation using the kryo is fast. The same benefit is not observed in carbondata becuase of overriding the class. Internal Spark sizeEstimator calculates the size of object and there are few extra objects in carbondata overriden class because of which the computation time is increasing. Solution Use the spark class instead of overriding the class in carbondata This closes #2803 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c7b3399 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c7b3399 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c7b3399 Branch: refs/heads/branch-1.5 Commit: 3c7b33992e06d81fb47d81bf8ccf7884f845b3ff Parents: 19097f2 Author: manishgupta88 <[email protected]> Authored: Mon Oct 8 19:38:54 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Oct 9 13:38:51 2018 +0530 ---------------------------------------------------------------------- .../carbondata/spark/load/CsvRDDHelper.scala | 4 +-- .../load/DataLoadProcessBuilderOnSpark.scala | 6 ++-- .../load/DataLoadProcessorStepOnSpark.scala | 6 ++-- .../apache/carbondata/spark/rdd/CarbonRDD.scala | 4 +-- .../spark/rdd/NewCarbonDataLoadRDD.scala | 33 -------------------- .../apache/spark/sql/util/SparkSQLUtil.scala | 21 ++++++++++++- .../spark/rdd/CarbonDataRDDFactory.scala | 4 +-- .../spark/sql/CarbonDictionaryDecoder.scala | 8 ++--- .../management/CarbonLoadDataCommand.scala | 7 +++-- .../command/mutation/DeleteExecution.scala | 7 ++--- .../command/mutation/HorizontalCompaction.scala | 8 ++--- 11 files changed, 46 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala index 8d6dd32..5511645 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -41,7 +42,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.spark.rdd.SerializableConfiguration import org.apache.carbondata.spark.util.CommonUtil object CsvRDDHelper { @@ -110,7 +110,7 @@ object CsvRDDHelper { closePartition() // 2. read function - val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf) val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { override def apply(file: PartitionedFile): Iterator[InternalRow] = { new Iterator[InternalRow] { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 2e74a94..923676c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.TaskContext import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory @@ -35,7 +36,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.rdd.SerializableConfiguration /** * Use sortBy operator in spark to load the data @@ -66,7 +66,7 @@ object DataLoadProcessBuilderOnSpark { val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") - val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) // 1. Input val inputRDD = originRDD .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)) @@ -121,7 +121,7 @@ object DataLoadProcessBuilderOnSpark { // 4. Write sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast, - writeStepRowCounter, conf)) + writeStepRowCounter, conf.value.value)) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will // not have any functional impact as spark automatically monitors the cache usage on each node http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index f17bd91..f1a12bf 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -42,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} -import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} object DataLoadProcessorStepOnSpark { @@ -230,8 +230,8 @@ object DataLoadProcessorStepOnSpark { index: Int, modelBroadcast: Broadcast[CarbonLoadModel], rowCounter: Accumulator[Int], - conf: Broadcast[SerializableConfiguration]) { - ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + conf: Configuration) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf) var model: CarbonLoadModel = null var tableName: String = null var rowConverter: RowConverterImpl = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala index 3a02f85..83cd59c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -22,7 +22,6 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.SparkSQLUtil @@ -49,8 +48,7 @@ abstract class CarbonRDD[T: ClassTag]( @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf() - val config: Broadcast[SerializableConfiguration] = sparkContext - .broadcast(new SerializableConfiguration(hadoopConf)) + val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf) /** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) = http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 87c8e4c..6076e4a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -54,39 +54,6 @@ import org.apache.carbondata.processing.util.CarbonQueryUtil import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} -class SerializableConfiguration(@transient var value: Configuration) extends Serializable { - - @transient - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - private def writeObject(out: ObjectOutputStream): Unit = - try { - out.defaultWriteObject() - value.write(out) - } catch { - case e: IOException => - LOGGER.error(e, "Exception encountered") - throw e - case NonFatal(e) => - LOGGER.error(e, "Exception encountered") - throw new IOException(e) - } - - - private def readObject(in: ObjectInputStream): Unit = - try { - value = new Configuration(false) - value.readFields(in) - } catch { - case e: IOException => - LOGGER.error(e, "Exception encountered") - throw e - case NonFatal(e) => - LOGGER.error(e, "Exception encountered") - throw new IOException(e) - } -} - /** * This partition class use to split by Host * http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index b7d47a0..9ffe6e1 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.util import java.lang.reflect.Method +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.{SessionState, SQLConf} -import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} +import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil} object SparkSQLUtil { def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState @@ -99,4 +102,20 @@ object SparkSQLUtil { throw new UnsupportedOperationException("Spark version not supported") } } + + /** + * Method to broadcast a variable using spark SerializableConfiguration class + * + * @param sparkContext + * @param hadoopConf + * @return + */ + def broadCastHadoopConf(sparkContext: SparkContext, + hadoopConf: Configuration): Broadcast[SerializableConfiguration] = { + sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf)) + } + + def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = { + new SerializableConfiguration(hadoopConf) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 6350b50..0ec3bc6 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.util.CarbonException +import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory @@ -728,7 +728,7 @@ object CarbonDataRDDFactory { // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, // so segmentIdIndex=partitionId/parallelism, this has been verified. - val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf) partitionByRdd.map(_._2).mapPartitions { partition => ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) val partitionId = TaskContext.getPartitionId() http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index d0ed56e..ff7ac60 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode} import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.SparkTypeConverter +import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -44,7 +44,7 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.spark.CarbonAliasDecoderRelation -import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration} +import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo /** * It decodes the data. @@ -76,8 +76,8 @@ case class CarbonDictionaryDecoder( (carbonTable.getTableName, carbonTable) }.toMap - val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession - .sessionState.newHadoopConf())) + val conf = SparkSQLUtil + .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf()) if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) { val dataTypes = child.output.map { attr => attr.dataType } child.execute().mapPartitions { iter => http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index f7a5f42..43c8b86 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSou import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} @@ -78,7 +79,7 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark} -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration} +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( @@ -986,8 +987,8 @@ case class CarbonLoadDataCommand( array } } - val conf = sparkSession.sparkContext - .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())) + val conf = SparkSQLUtil + .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf()) val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) => DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 4921b33..7e7f671 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -49,7 +50,6 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.exception.MultipleMatchingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.spark.DeleteDelataResultImpl -import org.apache.carbondata.spark.rdd.SerializableConfiguration object DeleteExecution { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) @@ -120,9 +120,8 @@ object DeleteExecution { blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, keyRdd.partitions.length) - val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession - .sessionState.newHadoopConf())) - + val conf = SparkSQLUtil + .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf()) val rdd = rowContRdd.join(keyRdd) res = rdd.mapPartitionsWithIndex( (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index 66066ed..35fc3c3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -26,16 +26,16 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager -import org.apache.carbondata.core.util.{ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} -import org.apache.carbondata.spark.rdd.SerializableConfiguration object HorizontalCompaction { @@ -191,8 +191,8 @@ object HorizontalCompaction { val timestamp = factTimeStamp val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails - val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession - .sessionState.newHadoopConf())) + val conf = SparkSQLUtil + .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf()) val result = rdd1.mapPartitions(iter => new Iterator[Seq[CarbonDataMergerUtilResult]] { ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
