Added set/reset commands in carbon to update/reset properties dynamically
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/95ce1da1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/95ce1da1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/95ce1da1 Branch: refs/heads/streaming_ingest Commit: 95ce1da1e6a828255ca6385ae5ab16706e66483f Parents: 28e2e17 Author: Manohar <[email protected]> Authored: Mon Jun 12 18:06:25 2017 +0530 Committer: Manohar <[email protected]> Committed: Tue Jun 27 14:39:51 2017 +0530 ---------------------------------------------------------------------- .../core/util/ThreadLocalSessionParams.java | 34 +++++++++++++++ .../spark/rdd/AlterTableAddColumnRDD.scala | 9 ++-- .../spark/rdd/AlterTableDropColumnRDD.scala | 10 ++--- .../spark/rdd/CarbonCleanFilesRDD.scala | 8 +--- .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 9 +--- .../spark/rdd/CarbonDeleteLoadRDD.scala | 9 +--- .../spark/rdd/CarbonDropTableRDD.scala | 12 ++--- .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 34 ++++----------- .../spark/rdd/CarbonIUDMergerRDD.scala | 3 -- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 8 +--- .../apache/carbondata/spark/rdd/CarbonRDD.scala | 46 ++++++++++++++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +--- .../spark/rdd/DataLoadCoalescedRDD.scala | 15 +++---- .../spark/rdd/NewCarbonDataLoadRDD.scala | 42 +++++++----------- .../spark/rdd/UpdateCoalescedRDD.scala | 10 ++--- .../carbondata/spark/rdd/UpdateDataLoad.scala | 4 +- .../sql/CarbonDatasourceHadoopRelation.scala | 7 ++- .../spark/sql/hive/CarbonStrategies.scala | 4 +- .../execution/command/CarbonHiveCommands.scala | 16 +------ .../spark/rdd/CarbonDataRDDFactory.scala | 16 +++---- .../sql/CarbonDatasourceHadoopRelation.scala | 3 ++ .../spark/sql/CarbonDictionaryDecoder.scala | 5 ++- .../scala/org/apache/spark/sql/CarbonEnv.scala | 12 ++++- .../org/apache/spark/sql/CarbonSource.scala | 5 +-- .../execution/CarbonLateDecodeStrategy.scala | 3 +- .../execution/CastExpressionOptimization.scala | 6 +-- .../execution/command/CarbonHiveCommands.scala | 26 ++++++++--- .../sql/execution/command/DDLStrategy.scala | 4 +- .../execution/command/carbonTableSchema.scala | 16 +++---- .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++++--- .../spark/sql/hive/CarbonSessionState.scala | 2 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 15 ++++--- 32 files changed, 217 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java new file mode 100644 index 0000000..354a0ee --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +/** + * This class maintains ThreadLocal session params + */ +public class ThreadLocalSessionParams { + static final InheritableThreadLocal<SessionParams> threadLocal = + new InheritableThreadLocal<SessionParams>(); + + public static void setSessionParams(SessionParams sessionParams) { + threadLocal.set(sessionParams); + } + + public static SessionParams getSessionParams() { + return threadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index 61e1e61..7eea95d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -50,24 +50,21 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par class AlterTableAddColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], carbonTableIdentifier: CarbonTableIdentifier, - carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { + carbonStorePath: String) + extends CarbonRDD[(Int, String)](sc, Nil) { val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new AddColumnPartition(id, column._2, column._1) }.toArray } - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, String)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS val iter = new Iterator[(Int, String)] { try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala index ba91673..fde5cd6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -26,7 +26,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.core.util.CarbonProperties /** * This is a partitioner class for dividing the newly added columns into partitions @@ -49,9 +48,8 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa class AlterTableDropColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], carbonTableIdentifier: CarbonTableIdentifier, - carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + carbonStorePath: String) + extends CarbonRDD[(Int, String)](sc, Nil) { override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => @@ -59,11 +57,9 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext, }.toArray } - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, String)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS val iter = new Iterator[(Int, String)] { try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala index c1a30b7..b63fc48 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala @@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -34,21 +33,18 @@ class CarbonCleanFilesRDD[V: ClassTag]( databaseName: String, tableName: String, partitioner: Partitioner) - extends RDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") - private val addedProperies = CarbonProperties.getInstance().getAddedProperies override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1)) } - override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = { val iter = new Iterator[(V)] { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala index f7bed59..da391cf 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.LoadMetadataDetails -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.DeletedLoadResult import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -40,12 +39,10 @@ class CarbonDeleteLoadByDateRDD[K, V]( dimTableName: String, storePath: String, loadMetadataDetails: List[LoadMetadataDetails]) - extends RDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {s => @@ -53,10 +50,8 @@ class CarbonDeleteLoadByDateRDD[K, V]( } } - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { new Iterator[(K, V)] { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala index 3ef9cef..9e43d0e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -35,11 +34,9 @@ class CarbonDeleteLoadRDD[V: ClassTag]( databaseName: String, tableName: String, partitioner: Partitioner) - extends RDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {f => @@ -47,10 +44,8 @@ class CarbonDeleteLoadRDD[V: ClassTag]( } } - override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = { val iter = new Iterator[V] { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala index 54f8ea5..d1d49b9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -22,7 +22,6 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -31,12 +30,10 @@ class CarbonDropTableRDD[V: ClassTag]( valueClass: Value[V], databaseName: String, tableName: String) - extends RDD[V](sc, Nil) { + extends CarbonRDD[V](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map { s => @@ -44,12 +41,9 @@ class CarbonDropTableRDD[V: ClassTag]( } } - override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { - - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = { - val iter = new Iterator[V] { + val iter = new Iterator[V] { // TODO: Clear Btree from memory var havePair = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 434fb3c..d0f9362 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -176,19 +176,15 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S class CarbonAllDictionaryCombineRDD( prev: RDD[(String, Iterable[String])], model: DictionaryLoadModel) - extends RDD[(Int, ColumnDistinctValues)](prev) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { override def getPartitions: Array[Partition] = { firstParent[(String, Iterable[String])].partitions } - override def compute(split: Partition, context: TaskContext + override def internalCompute(split: Partition, context: TaskContext ): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])] /* * for all dictionary, all columns need to encoding and checking @@ -273,17 +269,12 @@ class StringArrayRow(var values: Array[String]) extends Row { class CarbonBlockDistinctValuesCombineRDD( prev: RDD[Row], model: DictionaryLoadModel) - extends RDD[(Int, ColumnDistinctValues)](prev) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + extends CarbonRDD[(Int, ColumnDistinctValues)](prev) { override def getPartitions: Array[Partition] = firstParent[Row].partitions - - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, model.hdfsLocation) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() @@ -338,16 +329,13 @@ class CarbonBlockDistinctValuesCombineRDD( class CarbonGlobalDictionaryGenerateRDD( prev: RDD[(Int, ColumnDistinctValues)], model: DictionaryLoadModel) - extends RDD[(Int, String, Boolean)](prev) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + extends CarbonRDD[(Int, String, Boolean)](prev) { override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions - override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = { + override def internalCompute(split: Partition, + context: TaskContext): Iterator[(Int, String, Boolean)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, model.hdfsLocation) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS @@ -544,9 +532,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, dimensions: Array[CarbonDimension], hdfsLocation: String, dictFolderPath: String) - extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) { override def getPartitions: Array[Partition] = { val primDimensions = dictionaryLoadModel.primDimensions @@ -558,10 +544,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, result } - override def compute(split: Partition, context: TaskContext) + override def internalCompute(split: Partition, context: TaskContext) : Iterator[(Int, ColumnDistinctValues)] = { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val theSplit = split.asInstanceOf[CarbonColumnDictPatition] val primDimension = theSplit.preDefDictDimension // read the column dict data http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 38e3680..277005b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -51,8 +50,6 @@ class CarbonIUDMergerRDD[K, V]( carbonMergerMapping, confExecutorsTemp) { - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index dec3ee3..908043a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -59,7 +59,7 @@ class CarbonMergerRDD[K, V]( carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping, confExecutorsTemp: String) - extends RDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") @@ -74,12 +74,8 @@ class CarbonMergerRDD[K, V]( val factTableName = carbonMergerMapping.factTableName val tableId = carbonMergerMapping.tableId - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val iter = new Iterator[(K, V)] { carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala new file mode 100644 index 0000000..e00dd0f --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD + +import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams} + +/** + * This RDD maintains session level ThreadLocal + */ +abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext, + @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) { + + val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams + + /** Construct an RDD with just a one-to-one dependency on one parent */ + def this(@transient oneParent: RDD[_]) = + this (oneParent.context, List(new OneToOneDependency(oneParent))) + + // RDD compute logic should be here + def internalCompute(split: Partition, context: TaskContext): Iterator[T] + + final def compute(split: Partition, context: TaskContext): Iterator[T] = { + ThreadLocalSessionParams.setSessionParams(sessionParams) + internalCompute(split, context) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 2c10e65..3868342 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -54,7 +54,7 @@ class CarbonScanRDD( filterExpression: Expression, identifier: AbsoluteTableIdentifier, @transient carbonTable: CarbonTable) - extends RDD[InternalRow](sc, Nil) { + extends CarbonRDD[InternalRow](sc, Nil) { private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") private val jobTrackerId: String = { @@ -67,8 +67,6 @@ class CarbonScanRDD( private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName) - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - @transient private val jobId = new JobID(jobTrackerId, id) @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -175,15 +173,13 @@ class CarbonScanRDD( result.toArray(new Array[Partition](result.size())) } - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties" ) } - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala index 5da0835..b2d04ac 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala @@ -21,26 +21,21 @@ import scala.reflect.ClassTag import org.apache.spark._ -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.rdd.CarbonRDD case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition) class DataLoadCoalescedRDD[T: ClassTag]( - @transient var prev: RDD[T], - nodeList: Array[String]) - extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + @transient var prev: RDD[T], + nodeList: Array[String]) + extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) { override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run } - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) - new Iterator[DataLoadPartitionWrap[T]] { val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator def hasNext = iter.hasNext http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 5790369..129c642 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -20,7 +20,6 @@ package org.apache.carbondata.spark.rdd import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer import java.text.SimpleDateFormat -import java.util import java.util.{Date, UUID} import scala.collection.JavaConverters._ @@ -127,16 +126,12 @@ class SparkPartitionLoader(model: CarbonLoadModel, var storeLocation: String = "" - def initialize(addedProperies: util.Map[String, String]): Unit = { + def initialize(): Unit = { val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") } - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") @@ -177,7 +172,7 @@ class NewCarbonDataLoadRDD[K, V]( loadCount: Integer, blocksGroupBy: Array[(String, Array[BlockDetails])], isTableSplitPartition: Boolean) - extends RDD[(K, V)](sc, Nil) { + extends CarbonRDD[(K, V)](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") @@ -190,8 +185,6 @@ class NewCarbonDataLoadRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) - private val addedProperies = CarbonProperties.getInstance().getAddedProperies - override def getPartitions: Array[Partition] = { if (isTableSplitPartition) { // for table split partition @@ -222,7 +215,7 @@ class NewCarbonDataLoadRDD[K, V]( // Do nothing. Hadoop RDD should not be checkpointed. } - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { var partitionID = "0" @@ -246,7 +239,7 @@ class NewCarbonDataLoadRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize(addedProperies) + loader.initialize() new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) @@ -391,17 +384,16 @@ class NewCarbonDataLoadRDD[K, V]( * @see org.apache.carbondata.processing.newflow.DataLoadExecutor */ class NewDataFrameLoaderRDD[K, V]( - sc: SparkContext, - result: DataLoadResult[K, V], - carbonLoadModel: CarbonLoadModel, - loadCount: Integer, - tableCreationTime: Long, - schemaLastUpdatedTime: Long, - prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) { + sc: SparkContext, + result: DataLoadResult[K, V], + carbonLoadModel: CarbonLoadModel, + loadCount: Integer, + tableCreationTime: Long, + schemaLastUpdatedTime: Long, + prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) { - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { val partitionID = "0" @@ -438,7 +430,7 @@ class NewDataFrameLoaderRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize(addedProperies) + loader.initialize() new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray) } catch { case e: BadRecordFoundException => @@ -593,11 +585,9 @@ class PartitionTableDataLoaderRDD[K, V]( loadCount: Integer, tableCreationTime: Long, schemaLastUpdatedTime: Long, - prev: RDD[Row]) extends RDD[(K, V)](prev) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) { - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { val partitionID = "0" @@ -625,7 +615,7 @@ class PartitionTableDataLoaderRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize(addedProperies) + loader.initialize() new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) } catch { case e: BadRecordFoundException => http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala index 30050f7..1025da7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala @@ -22,25 +22,21 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.rdd.CarbonRDD // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node class UpdateCoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], nodeList: Array[String]) - extends RDD[T](prev.context, Nil) { - - private val addedProperies = CarbonProperties.getInstance().getAddedProperies + extends CarbonRDD[T](prev.context, Nil) { override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run } - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[T] = { - // Add the properties added in driver to executor. - CarbonProperties.getInstance().setProperties(addedProperies) // This iterator combines data from all the parent partitions new Iterator[T] { val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index 6b94894..bcfc096 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -17,8 +17,6 @@ package org.apache.carbondata.spark.rdd -import java.util - import scala.collection.mutable import org.apache.spark.TaskContext @@ -54,7 +52,7 @@ object UpdateDataLoad { segId, loadMetadataDetails) // Intialize to set carbon properties - loader.initialize(new util.HashMap) + loader.initialize() loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) new DataLoadExecutor().execute(carbonLoadModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 0e6153f..2fc93e6 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -42,6 +42,7 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader} import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.{CarbonFilters, CarbonOption} +import org.apache.carbondata.spark.rdd.CarbonRDD import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl private[sql] case class CarbonDatasourceHadoopRelation( @@ -94,7 +95,6 @@ private[sql] case class CarbonDatasourceHadoopRelation( requiredColumns.foreach(projection.addColumn) CarbonInputFormat.setColumnProjection(conf, projection) CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl]) - new CarbonHadoopFSRDD[Row](sqlContext.sparkContext, new SerializableConfiguration(conf), absIdentifier, @@ -120,7 +120,7 @@ class CarbonHadoopFSRDD[V: ClassTag]( identifier: AbsoluteTableIdentifier, inputFormatClass: Class[_ <: CarbonInputFormat[V]], valueClass: Class[V]) - extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil { + extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil { private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -128,8 +128,7 @@ class CarbonHadoopFSRDD[V: ClassTag]( } @transient protected val jobId = new JobID(jobTrackerId, id) - @DeveloperApi - override def compute(split: Partition, + override def internalCompute(split: Partition, context: TaskContext): Iterator[V] = { val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 7bfd742..f0cd33b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _} import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan} -import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan} +import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand} @@ -316,8 +316,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } else { ExecutedCommand(HiveNativeCommand(sql)) :: Nil } - case set@SetCommand(kv) => - ExecutedCommand(CarbonSetCommand(set)) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index d047b20..0f42940 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -18,12 +18,10 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql._ -import org.apache.spark.sql.execution.{RunnableCommand, SetCommand} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.carbondata.core.util.CarbonProperties - private[hive] case class CreateDatabaseCommand(dbName: String, command: HiveNativeCommand) extends RunnableCommand { def run(sqlContext: SQLContext): Seq[Row] = { @@ -55,15 +53,3 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String, rows } } - -case class CarbonSetCommand(command: SetCommand) - extends RunnableCommand { - - override val output = command.output - - override def run(sparkSession: SQLContext): Seq[Row] = { - val rows = command.run(sparkSession) - CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1)) - rows - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 2b77654..48af516 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -128,7 +128,7 @@ object CarbonDataRDDFactory { isCompactionTriggerByDDl ) - val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams + val isConcurrentCompactionAllowed = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ) @@ -275,8 +275,8 @@ object CarbonDataRDDFactory { exception = e } // continue in case of exception also, check for all the tables. - val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession). - sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + val isConcurrentCompactionAllowed = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ).equalsIgnoreCase("true") @@ -397,8 +397,8 @@ object CarbonDataRDDFactory { } storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession) - .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + val isConcurrentCompactionAllowed = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ) .equalsIgnoreCase("true") @@ -1042,8 +1042,7 @@ object CarbonDataRDDFactory { val timeStampFormat = if (specificFormat.isDefined) { new SimpleDateFormat(specificFormat.get) } else { - val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession) - .sessionParams.getProperty(CarbonCommonConstants + val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) new SimpleDateFormat(timestampFormatString) } @@ -1051,8 +1050,7 @@ object CarbonDataRDDFactory { val dateFormat = if (specificFormat.isDefined) { new SimpleDateFormat(specificFormat.get) } else { - val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession) - .sessionParams.getProperty(CarbonCommonConstants + val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) new SimpleDateFormat(dateFormatString) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index b0044d7..7c096d3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.expression.logical.AndExpression +import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams} import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.processing.merger.TableMeta @@ -52,6 +53,8 @@ case class CarbonDatasourceHadoopRelation( absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession) .asInstanceOf[CarbonRelation] + val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams + ThreadLocalSessionParams.setSessionParams(sessionParams) override def sqlContext: SQLContext = sparkSession.sqlContext override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 49cf54f..bd1c8b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.spark.CarbonAliasDecoderRelation +import org.apache.carbondata.spark.rdd.CarbonRDD /** * It decodes the data. @@ -444,7 +445,7 @@ class CarbonDecoderRDD( prev: RDD[InternalRow], output: Seq[Attribute], sparkSession: SparkSession) - extends RDD[InternalRow](prev) { + extends CarbonRDD[InternalRow](prev) { private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath @@ -513,7 +514,7 @@ class CarbonDecoderRDD( dictIds } - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val absoluteTableIdentifiers = relations.map { relation => val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 0851ec2..78820ea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -21,10 +21,11 @@ import java.util.Map import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog} +import org.apache.spark.sql.internal.CarbonSQLConf import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.{CarbonProperties, SessionParams} +import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -48,11 +49,18 @@ class CarbonEnv { sparkSession.udf.register("getTupleId", () => "") if (!initialized) { sessionParams = new SessionParams() + ThreadLocalSessionParams.setSessionParams(sessionParams) + val config = new CarbonSQLConf(sparkSession) + if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) { + config.addDefaultCarbonParams() + } + // add session params after adding DefaultCarbonParams + config.addDefaultCarbonSessionParams() carbonMetastore = { val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) LOGGER.info(s"carbon env initial: $storePath") - new CarbonMetastore(sparkSession.conf, storePath, sessionParams) + new CarbonMetastore(sparkSession.conf, storePath) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") initialized = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 3079c84..1c16143 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider None) case _ => val options = new CarbonOption(parameters) - val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams + val storePath = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = storePath + "/" + options.dbName + "/" + options.tableName CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None) @@ -77,8 +77,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider "specified when creating CarbonContext") val options = new CarbonOption(parameters) - val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams - .getProperty(CarbonCommonConstants.STORE_LOCATION) + val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) .exists(tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 8d0b4ea..4605914 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -520,8 +520,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) { System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) } else { - CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams - .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala index 805a4df..a8985b9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala @@ -24,7 +24,8 @@ import java.util.{Locale, TimeZone} import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} -import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources} +import org.apache.spark.sql.CastExpr +import org.apache.spark.sql.sources import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -34,8 +35,7 @@ object CastExpressionOptimization { def typeCastStringToLong(v: Any): Any = { - val parser: SimpleDateFormat = new SimpleDateFormat( - CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams + val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala index 627de02..a4feead 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand} +import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand} import org.apache.carbondata.core.util.CarbonProperties @@ -49,10 +49,26 @@ case class CarbonSetCommand(command: SetCommand) override val output = command.output override def run(sparkSession: SparkSession): Seq[Row] = { - val rows = command.run(sparkSession) - CarbonEnv.getInstance(sparkSession).sessionParams - .addProperty(rows.head.getString(0), rows.head.getString(1)) - rows + val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams + command.kv match { + case Some((key, Some(value))) => + val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key) + if (isCarbonProperty) { + sessionParms.addProperty(key, value) + } + case _ => + + } + command.run(sparkSession) } } +case class CarbonResetCommand() + extends RunnableCommand { + override val output = ResetCommand.output + + override def run(sparkSession: SparkSession): Seq[Row] = { + CarbonEnv.getInstance(sparkSession).sessionParams.clear() + ResetCommand.run(sparkSession) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 35be543..7d0215f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} -import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand} +import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil case set@SetCommand(kv) => ExecutedCommandExec(CarbonSetCommand(set)) :: Nil + case reset@ResetCommand => + ExecutedCommandExec(CarbonResetCommand()) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index f1fd05b..0064c21 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) carbonLoadModel.setStorePath(relation.tableMeta.storePath) - var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams + var storeLocation = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, System.getProperty("java.io.tmpdir") ) @@ -359,8 +359,7 @@ case class LoadTable( sys.error(s"Data loading failed. table not found: $dbName.$tableName") } - CarbonEnv.getInstance(sparkSession).sessionParams - .addProperty("zookeeper.enable.lock", "false") + CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") val carbonLock = CarbonLockFactory .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier, @@ -409,7 +408,7 @@ case class LoadTable( val columnDict = options.getOrElse("columndict", null) val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false") - val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams + val badRecordActionValue = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue) @@ -429,12 +428,11 @@ case class LoadTable( carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#")) carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession) - .sessionParams.getProperty( + carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams. - getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel .setSerializationNullFormat( @@ -536,7 +534,7 @@ case class LoadTable( allDictionaryPath) } // dictionaryServerClient dictionary generator - val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams + val dictionaryServerPort = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) val sparkDriverHost = sparkSession.sqlContext.sparkContext. http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index 54cffc2..04a94ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession} +import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca import org.apache.carbondata.core.reader.ThriftReader import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) { +class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -201,15 +201,18 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: // if zookeeper is configured as carbon lock type. val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) if (null != zookeeperurl) { - sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) } if (metadataPath == null) { return null } // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) && + if (null == CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.LOCK_TYPE) && FileType.HDFS == FileFactory.getFileType(metadataPath)) { - sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE, + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS ) LOGGER.info("Default lock type HDFSLOCK is configured") http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala index 156a12e..4aef118 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -115,7 +115,7 @@ class CarbonSessionCatalog( */ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) { - override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf) + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) experimentalMethods.extraStrategies = Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 258920b..3412fb0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -18,30 +18,33 @@ package org.apache.spark.sql.parser import scala.collection.mutable +import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser} import org.apache.spark.sql.catalyst.parser.ParserUtils._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, -TablePropertyListContext} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, -PartitionerField, TableModel} +import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} +import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** - * Concrete parser for Spark SQL statements and carbon specific statements + * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific + * statements */ -class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser { +class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { val astBuilder = new CarbonSqlAstBuilder(conf) private val substitutor = new VariableSubstitution(conf) override def parsePlan(sqlText: String): LogicalPlan = { + val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams + ThreadLocalSessionParams.setSessionParams(sessionParams) try { super.parsePlan(sqlText) } catch {
