Adding session based properties Added set command in carbon to update properties dynamically
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28e2e171 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28e2e171 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28e2e171 Branch: refs/heads/streaming_ingest Commit: 28e2e171db578e2467be55939d2da9a5f1b70d09 Parents: 2234ec8 Author: ravipesala <[email protected]> Authored: Thu May 18 15:04:17 2017 +0530 Committer: Manohar <[email protected]> Committed: Tue Jun 27 14:39:50 2017 +0530 ---------------------------------------------------------------------- .../carbondata/core/util/CarbonProperties.java | 23 +++++++ .../carbondata/core/util/SessionParams.java | 70 ++++++++++++++++++++ .../testsuite/commands/SetCommandTestCase.scala | 34 ++++++++++ .../spark/rdd/AlterTableAddColumnRDD.scala | 4 ++ .../spark/rdd/AlterTableDropColumnRDD.scala | 5 ++ .../spark/rdd/CarbonCleanFilesRDD.scala | 5 ++ .../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 5 ++ .../spark/rdd/CarbonDeleteLoadRDD.scala | 5 ++ .../spark/rdd/CarbonDropTableRDD.scala | 6 ++ .../spark/rdd/CarbonGlobalDictionaryRDD.scala | 17 ++++- .../spark/rdd/CarbonIUDMergerRDD.scala | 3 + .../carbondata/spark/rdd/CarbonMergerRDD.scala | 4 ++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 4 ++ .../spark/rdd/DataLoadCoalescedRDD.scala | 6 ++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 17 +++-- .../spark/rdd/UpdateCoalescedRDD.scala | 7 +- .../carbondata/spark/rdd/UpdateDataLoad.scala | 4 +- .../spark/sql/hive/CarbonStrategies.scala | 4 +- .../execution/command/CarbonHiveCommands.scala | 16 ++++- .../spark/rdd/CarbonDataRDDFactory.scala | 16 +++-- .../scala/org/apache/spark/sql/CarbonEnv.scala | 9 ++- .../org/apache/spark/sql/CarbonSource.scala | 5 +- .../execution/CarbonLateDecodeStrategy.scala | 3 +- .../execution/CastExpressionOptimization.scala | 6 +- .../execution/command/CarbonHiveCommands.scala | 18 ++++- .../sql/execution/command/DDLStrategy.scala | 10 +-- .../execution/command/carbonTableSchema.scala | 16 +++-- .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++--- 28 files changed, 290 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 90c6ffa..0142e38 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.carbondata.common.logging.LogService; @@ -47,6 +49,11 @@ public final class CarbonProperties { private Properties carbonProperties; /** + * Added properties on the fly. + */ + private Map<String, String> setProperties = new HashMap<>(); + + /** * Private constructor this will call load properties method to load all the * carbon properties in memory. */ @@ -447,10 +454,26 @@ public final class CarbonProperties { * @return properties value */ public CarbonProperties addProperty(String key, String value) { + setProperties.put(key, value); carbonProperties.setProperty(key, value); return this; } + /** + * Get all the added properties. + * @return + */ + public Map<String, String> getAddedProperies() { + return setProperties; + } + + public void setProperties(Map<String, String> newProperties) { + setProperties.putAll(newProperties); + for (Map.Entry<String, String> entry : newProperties.entrySet()) { + carbonProperties.setProperty(entry.getKey(), entry.getValue()); + } + } + private ColumnarFormatVersion getDefaultFormatVersion() { return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java new file mode 100644 index 0000000..781b898 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -0,0 +1,70 @@ +package org.apache.carbondata.core.util; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Created by root1 on 19/5/17. + */ +public class SessionParams implements Serializable { + + protected transient CarbonProperties properties; + + private Map<String, String> sProps; + + public SessionParams() { + sProps = new HashMap<>(); + properties = CarbonProperties.getInstance(); + } + + public SessionParams(SessionParams sessionParams) { + this(); + sProps.putAll(sessionParams.sProps); + } + + /** + * This method will be used to get the properties value + * + * @param key + * @return properties value + */ + public String getProperty(String key) { + String s = sProps.get(key); + if (key == null) { + s = properties.getProperty(key); + } + return s; + } + + /** + * This method will be used to get the properties value if property is not + * present then it will return tghe default value + * + * @param key + * @return properties value + */ + public String getProperty(String key, String defaultValue) { + String value = sProps.get(key); + if (key == null) { + value = properties.getProperty(key, defaultValue); + } + return value; + } + + /** + * This method will be used to add a new property + * + * @param key + * @return properties value + */ + public SessionParams addProperty(String key, String value) { + sProps.put(key, value); + return this; + } + + public void setProperties(Map<String, String> newProperties) { + sProps.putAll(newProperties); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala new file mode 100644 index 0000000..28e2dbf --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.commands + +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.util.CarbonProperties + +class SetCommandTestCase extends QueryTest with BeforeAndAfterAll { + + test("test set command") { + + sql("set key1=value1") + + assert(CarbonProperties.getInstance().getProperty("key1").equals("value1"), "Set command does not work" ) + assert(sqlContext.getConf("key1").equals("value1"), "Set command does not work" ) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index d81ed30..61e1e61 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -55,6 +55,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS) + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new AddColumnPartition(id, column._2, column._1) @@ -64,6 +66,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, override def compute(split: Partition, context: TaskContext): Iterator[(Int, String)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS val iter = new Iterator[(Int, String)] { try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala index 53796bb..ba91673 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.CarbonProperties /** * This is a partitioner class for dividing the newly added columns into partitions @@ -50,6 +51,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext, carbonTableIdentifier: CarbonTableIdentifier, carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { newColumns.zipWithIndex.map { column => new DropColumnPartition(id, column._2, column._1) @@ -59,6 +62,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext, override def compute(split: Partition, context: TaskContext): Iterator[(Int, String)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS val iter = new Iterator[(Int, String)] { try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala index 9cc46c1..c1a30b7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -37,6 +38,8 @@ class CarbonCleanFilesRDD[V: ClassTag]( sc.setLocalProperty("spark.scheduler.pool", "DDL") + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1)) @@ -44,6 +47,8 @@ class CarbonCleanFilesRDD[V: ClassTag]( override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { val iter = new Iterator[(V)] { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala index f9a7bdd..f7bed59 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.DeletedLoadResult import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -43,6 +44,8 @@ class CarbonDeleteLoadByDateRDD[K, V]( sc.setLocalProperty("spark.scheduler.pool", "DDL") + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {s => @@ -52,6 +55,8 @@ class CarbonDeleteLoadByDateRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { new Iterator[(K, V)] { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala index 26e1abc..3ef9cef 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -37,6 +38,8 @@ class CarbonDeleteLoadRDD[V: ClassTag]( extends RDD[V](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {f => @@ -46,6 +49,8 @@ class CarbonDeleteLoadRDD[V: ClassTag]( override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { val iter = new Iterator[V] { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala index dc63098..54f8ea5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -22,6 +22,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.Value import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -34,6 +35,8 @@ class CarbonDropTableRDD[V: ClassTag]( sc.setLocalProperty("spark.scheduler.pool", "DDL") + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map { s => @@ -43,6 +46,9 @@ class CarbonDropTableRDD[V: ClassTag]( override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) + val iter = new Iterator[V] { // TODO: Clear Btree from memory http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 1e33188..434fb3c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -178,6 +178,8 @@ class CarbonAllDictionaryCombineRDD( model: DictionaryLoadModel) extends RDD[(Int, ColumnDistinctValues)](prev) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { firstParent[(String, Iterable[String])].partitions } @@ -185,7 +187,8 @@ class CarbonAllDictionaryCombineRDD( override def compute(split: Partition, context: TaskContext ): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])] /* * for all dictionary, all columns need to encoding and checking @@ -272,11 +275,15 @@ class CarbonBlockDistinctValuesCombineRDD( model: DictionaryLoadModel) extends RDD[(Int, ColumnDistinctValues)](prev) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = firstParent[Row].partitions override def compute(split: Partition, context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, model.hdfsLocation) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime() @@ -333,10 +340,14 @@ class CarbonGlobalDictionaryGenerateRDD( model: DictionaryLoadModel) extends RDD[(Int, String, Boolean)](prev) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, model.hdfsLocation) val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS @@ -535,6 +546,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, dictFolderPath: String) extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val primDimensions = dictionaryLoadModel.primDimensions val primDimLength = primDimensions.length @@ -547,6 +560,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel, override def compute(split: Partition, context: TaskContext) : Iterator[(Int, ColumnDistinctValues)] = { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val theSplit = split.asInstanceOf[CarbonColumnDictPatition] val primDimension = theSplit.preDefDictDimension // read the column dict data http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 277005b..38e3680 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.merger.CarbonDataMergerUtil @@ -50,6 +51,8 @@ class CarbonIUDMergerRDD[K, V]( carbonMergerMapping, confExecutorsTemp) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index caa389a..dec3ee3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -74,8 +74,12 @@ class CarbonMergerRDD[K, V]( val factTableName = carbonMergerMapping.factTableName val tableId = carbonMergerMapping.tableId + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val iter = new Iterator[(K, V)] { carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 4807b90..2c10e65 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -67,6 +67,8 @@ class CarbonScanRDD( private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName) + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + @transient private val jobId = new JobID(jobTrackerId, id) @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -180,6 +182,8 @@ class CarbonScanRDD( System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties" ) } + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala index 7395e43..5da0835 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala @@ -21,6 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.carbondata.core.util.CarbonProperties + case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition) class DataLoadCoalescedRDD[T: ClassTag]( @@ -28,12 +30,16 @@ class DataLoadCoalescedRDD[T: ClassTag]( nodeList: Array[String]) extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run } override def compute(split: Partition, context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = { + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) new Iterator[DataLoadPartitionWrap[T]] { val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 058c5c6..5790369 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import java.nio.ByteBuffer import java.text.SimpleDateFormat +import java.util import java.util.{Date, UUID} import scala.collection.JavaConverters._ @@ -126,12 +127,16 @@ class SparkPartitionLoader(model: CarbonLoadModel, var storeLocation: String = "" - def initialize(): Unit = { + def initialize(addedProperies: util.Map[String, String]): Unit = { val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") } + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") @@ -185,6 +190,8 @@ class NewCarbonDataLoadRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { if (isTableSplitPartition) { // for table split partition @@ -239,7 +246,7 @@ class NewCarbonDataLoadRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize() + loader.initialize(addedProperies) new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) @@ -392,6 +399,7 @@ class NewDataFrameLoaderRDD[K, V]( schemaLastUpdatedTime: Long, prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -430,7 +438,7 @@ class NewDataFrameLoaderRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize() + loader.initialize(addedProperies) new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray) } catch { case e: BadRecordFoundException => @@ -587,6 +595,7 @@ class PartitionTableDataLoaderRDD[K, V]( schemaLastUpdatedTime: Long, prev: RDD[Row]) extends RDD[(K, V)](prev) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -616,7 +625,7 @@ class PartitionTableDataLoaderRDD[K, V]( String.valueOf(loadCount), loadMetadataDetails) // Intialize to set carbon properties - loader.initialize() + loader.initialize(addedProperies) new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) } catch { case e: BadRecordFoundException => http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala index 67e094a..30050f7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala @@ -22,6 +22,8 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD} +import org.apache.carbondata.core.util.CarbonProperties + // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node class UpdateCoalescedRDD[T: ClassTag]( @@ -29,13 +31,16 @@ class UpdateCoalescedRDD[T: ClassTag]( nodeList: Array[String]) extends RDD[T](prev.context, Nil) { + private val addedProperies = CarbonProperties.getInstance().getAddedProperies + override def getPartitions: Array[Partition] = { new DataLoadPartitionCoalescer(prev, nodeList).run } override def compute(split: Partition, context: TaskContext): Iterator[T] = { - + // Add the properties added in driver to executor. + CarbonProperties.getInstance().setProperties(addedProperies) // This iterator combines data from all the parent partitions new Iterator[T] { val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index bcfc096..6b94894 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.rdd +import java.util + import scala.collection.mutable import org.apache.spark.TaskContext @@ -52,7 +54,7 @@ object UpdateDataLoad { segId, loadMetadataDetails) // Intialize to set carbon properties - loader.initialize() + loader.initialize(new util.HashMap) loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) new DataLoadExecutor().execute(carbonLoadModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index f0cd33b..7bfd742 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _} import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan} -import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan} +import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand} @@ -316,6 +316,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } else { ExecutedCommand(HiveNativeCommand(sql)) :: Nil } + case set@SetCommand(kv) => + ExecutedCommand(CarbonSetCommand(set)) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index 0f42940..d047b20 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql._ -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.{RunnableCommand, SetCommand} import org.apache.spark.sql.execution.command.DropTableCommand import org.apache.spark.sql.hive.execution.HiveNativeCommand +import org.apache.carbondata.core.util.CarbonProperties + private[hive] case class CreateDatabaseCommand(dbName: String, command: HiveNativeCommand) extends RunnableCommand { def run(sqlContext: SQLContext): Seq[Row] = { @@ -53,3 +55,15 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String, rows } } + +case class CarbonSetCommand(command: SetCommand) + extends RunnableCommand { + + override val output = command.output + + override def run(sparkSession: SQLContext): Seq[Row] = { + val rows = command.run(sparkSession) + CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1)) + rows + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 48af516..2b77654 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -128,7 +128,7 @@ object CarbonDataRDDFactory { isCompactionTriggerByDDl ) - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() + val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ) @@ -275,8 +275,8 @@ object CarbonDataRDDFactory { exception = e } // continue in case of exception also, check for all the tables. - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession). + sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ).equalsIgnoreCase("true") @@ -397,8 +397,8 @@ object CarbonDataRDDFactory { } storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - val isConcurrentCompactionAllowed = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, + val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession) + .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION ) .equalsIgnoreCase("true") @@ -1042,7 +1042,8 @@ object CarbonDataRDDFactory { val timeStampFormat = if (specificFormat.isDefined) { new SimpleDateFormat(specificFormat.get) } else { - val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants + val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession) + .sessionParams.getProperty(CarbonCommonConstants .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) new SimpleDateFormat(timestampFormatString) } @@ -1050,7 +1051,8 @@ object CarbonDataRDDFactory { val dateFormat = if (specificFormat.isDefined) { new SimpleDateFormat(specificFormat.get) } else { - val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants + val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession) + .sessionParams.getProperty(CarbonCommonConstants .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) new SimpleDateFormat(dateFormatString) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index b46488c..0851ec2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, SessionParams} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -35,6 +35,8 @@ class CarbonEnv { var carbonMetastore: CarbonMetastore = _ + var sessionParams: SessionParams = _ + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) // set readsupport class global so that the executor can get it. @@ -45,11 +47,12 @@ class CarbonEnv { def init(sparkSession: SparkSession): Unit = { sparkSession.udf.register("getTupleId", () => "") if (!initialized) { + sessionParams = new SessionParams() carbonMetastore = { val storePath = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) LOGGER.info(s"carbon env initial: $storePath") - new CarbonMetastore(sparkSession.conf, storePath) + new CarbonMetastore(sparkSession.conf, storePath, sessionParams) } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") initialized = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 1c16143..3079c84 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider None) case _ => val options = new CarbonOption(parameters) - val storePath = CarbonProperties.getInstance() + val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams .getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = storePath + "/" + options.dbName + "/" + options.tableName CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None) @@ -77,7 +77,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider "specified when creating CarbonContext") val options = new CarbonOption(parameters) - val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) + val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams + .getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) .exists(tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 4605914..8d0b4ea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -520,7 +520,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) { System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) } else { - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams + .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala index a8985b9..805a4df 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala @@ -24,8 +24,7 @@ import java.util.{Locale, TimeZone} import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} -import org.apache.spark.sql.CastExpr -import org.apache.spark.sql.sources +import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources} import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -35,7 +34,8 @@ object CastExpressionOptimization { def typeCastStringToLong(v: Any): Any = { - val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance + val parser: SimpleDateFormat = new SimpleDateFormat( + CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala index b72f077..627de02 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.hive.execution.command import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand} +import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand} + +import org.apache.carbondata.core.util.CarbonProperties case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) extends RunnableCommand { @@ -40,3 +42,17 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) rows } } + +case class CarbonSetCommand(command: SetCommand) + extends RunnableCommand { + + override val output = command.output + + override def run(sparkSession: SparkSession): Seq[Row] = { + val rows = command.run(sparkSession) + CarbonEnv.getInstance(sparkSession).sessionParams + .addProperty(rows.head.getString(0), rows.head.getString(1)) + rows + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 3593b6d..35be543 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -16,13 +16,12 @@ */ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, -ShowLoadsCommand, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} -import org.apache.spark.sql.hive.execution.command.CarbonDropDatabaseCommand +import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -110,13 +109,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted) - if - CarbonEnv.getInstance(sparkSession).carbonMetastore + if CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(identifier)(sparkSession) && isFormatted => val resolvedTable = sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil + case set@SetCommand(kv) => + ExecutedCommandExec(CarbonSetCommand(set)) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 0064c21..f1fd05b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) carbonLoadModel.setStorePath(relation.tableMeta.storePath) - var storeLocation = CarbonProperties.getInstance + var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, System.getProperty("java.io.tmpdir") ) @@ -359,7 +359,8 @@ case class LoadTable( sys.error(s"Data loading failed. table not found: $dbName.$tableName") } - CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") + CarbonEnv.getInstance(sparkSession).sessionParams + .addProperty("zookeeper.enable.lock", "false") val carbonLock = CarbonLockFactory .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier, @@ -408,7 +409,7 @@ case class LoadTable( val columnDict = options.getOrElse("columndict", null) val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false") - val badRecordActionValue = CarbonProperties.getInstance() + val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue) @@ -428,11 +429,12 @@ case class LoadTable( carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#")) carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession) + .sessionParams.getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, + carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams. + getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel .setSerializationNullFormat( @@ -534,7 +536,7 @@ case class LoadTable( allDictionaryPath) } // dictionaryServerClient dictionary generator - val dictionaryServerPort = CarbonProperties.getInstance() + val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) val sparkDriverHost = sparkSession.sqlContext.sparkContext. http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index 04a94ce..54cffc2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca import org.apache.carbondata.core.reader.ThriftReader import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { } } -class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { +class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) { @transient val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") @@ -201,18 +201,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { // if zookeeper is configured as carbon lock type. val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) + sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) } if (metadataPath == null) { return null } // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && + if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) && FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, + sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS ) LOGGER.info("Default lock type HDFSLOCK is configured")
