This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new afb7626 [HOTFIX] Refact Carbon Util afb7626 is described below commit afb7626e6c4c31c93795eebcbcc337e83e595fb3 Author: Zhangshunyu <zhangshunyu1...@126.com> AuthorDate: Fri Nov 27 11:17:55 2020 +0800 [HOTFIX] Refact Carbon Util Why is this PR needed? Currentlly, we have some Carbon{$FUNCTION_NAME}Util, for example CarbonSparkUtil, CarbonQueryUtil, CarbonMergeUtil, CarbonLoadUtil, and we also have some CarbonUtil/CarbonUtils which has some mixed functions in, we should clean code and move the functions in CarbonUtils to the specified Util where it should be. What changes were proposed in this PR? Refact the code to clean it Does this PR introduce any user interface change? No Is any new testcase added? No This closes #4029 --- .../carbondata/benchmark/ConcurrentQueryBenchmark.scala | 2 +- .../org/apache/carbondata/examples/S3CsvExample.scala | 2 +- .../apache/carbondata/spark/rdd/CarbonTableCompactor.scala | 6 +++--- .../org/apache/carbondata/spark/util/CarbonSparkUtil.scala | 10 ++++++++++ .../org/apache/carbondata/view/MVManagerInSpark.scala | 6 +++--- .../scala/org/apache/carbondata/view/MVRefresher.scala | 6 +++--- .../apache/spark/sql/CarbonDatasourceHadoopRelation.scala | 2 +- .../sql/{CarbonUtils.scala => CarbonThreadUtil.scala} | 10 +--------- .../sql/execution/command/management/CommonLoadUtils.scala | 4 ++-- .../command/mutation/merge/CarbonMergeDataSetCommand.scala | 5 ++--- .../apache/spark/sql/parser/CarbonExtensionSqlParser.scala | 4 ++-- .../org/apache/spark/sql/parser/CarbonSparkSqlParser.scala | 4 ++-- .../optimizer/CarbonSITransformationRule.scala | 5 +++-- .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala | 6 +++--- .../TestSegmentReadingForMultiThreading.scala | 14 +++++++------- .../spark/carbondata/query/TestFilterReordering.scala | 6 +++--- 16 files changed, 47 insertions(+), 45 deletions(-) diff --git a/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala index d1ca452..d49940c 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala @@ -513,7 +513,7 @@ object ConcurrentQueryBenchmark { .addProperty("carbon.blockletgroup.size.in.mb", "32") .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false") .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "false") - import org.apache.spark.sql.CarbonUtils._ + import org.apache.spark.sql.CarbonThreadUtil._ // 1. initParameters initParameters(args) diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala index f9a5b90..01345b3 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala @@ -35,7 +35,7 @@ object S3CsvExample { + "../../../..").getCanonicalPath val logger: Logger = LoggerFactory.getLogger(this.getClass) - import org.apache.spark.sql.CarbonUtils._ + import org.apache.spark.sql.CarbonThreadUtil._ if (args.length != 4) { logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + "<s3.csv.location> <spark-master>") diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 9e1369b..a381089 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.hadoop.mapreduce.InputSplit -import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext} +import org.apache.spark.sql.{CarbonThreadUtil, SparkSession, SQLContext} import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} import org.apache.spark.sql.execution.command.management.CommonLoadUtils import org.apache.spark.sql.util.SparkSQLUtil @@ -433,7 +433,7 @@ class CarbonTableCompactor( carbonMergerMapping.validSegments) var loadResult: Array[(String, Boolean)] = null try { - CarbonUtils + CarbonThreadUtil .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName, splits.asScala.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) @@ -458,7 +458,7 @@ class CarbonTableCompactor( (row._1, FailureCauses.NONE == row._2._2.failureCauses) } } finally { - CarbonUtils + CarbonThreadUtil .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + table.getDatabaseName + "." + table.getTableName) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index d3607c1..53e5e5c 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -26,6 +26,9 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.types._ @@ -47,6 +50,13 @@ object CarbonSparkUtil { table) } + def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDatasourceHadoopRelation] = { + plan collect { + case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + } + } + /** * return's the formatted column comment if column comment is present else empty("") * diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala index ebdc3e8..676c135 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala @@ -21,7 +21,7 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, CarbonUtils, SparkSession} +import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -29,7 +29,7 @@ import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory, MVManager, class MVManagerInSpark(session: SparkSession) extends MVManager { override def getDatabases: util.List[String] = { - CarbonUtils.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true") + CarbonThreadUtil.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true") try { val databaseList = session.catalog.listDatabases() val databaseNameList = new util.ArrayList[String]() @@ -38,7 +38,7 @@ class MVManagerInSpark(session: SparkSession) extends MVManager { } databaseNameList } finally { - CarbonUtils.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV) + CarbonThreadUtil.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV) } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala index 152fbb4..31b5548 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import com.google.gson.Gson import org.apache.log4j.Logger -import org.apache.spark.sql.{CarbonUtils, SparkSession} +import org.apache.spark.sql.{CarbonThreadUtil, SparkSession} import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand import org.apache.spark.sql.parser.MVQueryParser @@ -372,7 +372,7 @@ object MVRefresher { */ private def setInputSegments(tableUniqueName: String, mainTableSegmentList: java.util.List[String]): Unit = { - CarbonUtils + CarbonThreadUtil .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueName, mainTableSegmentList.asScala.mkString(",")) } @@ -380,7 +380,7 @@ object MVRefresher { private def unsetInputSegments(schema: MVSchema): Unit = { val relatedTableIdentifiers = schema.getRelatedTables for (relationIdentifier <- relatedTableIdentifiers.asScala) { - CarbonUtils + CarbonThreadUtil .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 10335d9..a1273af 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -55,7 +55,7 @@ case class CarbonDatasourceHadoopRelation( FileFactory.getUpdatedFilePath(paths.head), CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession), caseInsensitiveMap("tablename")) - CarbonUtils.updateSessionInfoToCurrentThread(sparkSession) + CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession) @transient lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetaStore. diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala similarity index 89% rename from integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala rename to integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala index 9826666..25f9b21 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonUtils.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonThreadUtil.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo} -object CarbonUtils { +object CarbonThreadUtil { private[sql] val threadStatementId = new ThreadLocal[Long] @@ -81,10 +79,4 @@ object CarbonUtils { .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf()) } - def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDatasourceHadoopRelation] = { - plan collect { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - } - } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala index 3bc2590..d22a1e0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala @@ -861,7 +861,7 @@ object CommonLoadUtils { def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = { val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get - CarbonUtils.threadSet("partition.operationcontext", loadParams.operationContext) + CarbonThreadUtil.threadSet("partition.operationcontext", loadParams.operationContext) val attributes = if (loadParams.scanResultRDD.isDefined) { // take the already re-arranged attributes catalogTable.schema.toAttributes @@ -1059,7 +1059,7 @@ object CommonLoadUtils { LOGGER.error(ex) throw ex } finally { - CarbonUtils.threadUnset("partition.operationcontext") + CarbonThreadUtil.threadUnset("partition.operationcontext") if (loadParams.isOverwriteTable) { IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier) // Clean the overwriting segments if any. diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala index 6aa8c88..b026e4d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, CarbonUtils, Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, CarbonThreadUtil, Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.avro.AvroFileFormatFactory import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -49,7 +49,6 @@ import org.apache.carbondata.core.index.Segment import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.OperationContext import org.apache.carbondata.processing.loading.FailureCauses @@ -79,7 +78,7 @@ case class CarbonMergeDataSetCommand( * */ override def processData(sparkSession: SparkSession): Seq[Row] = { - val relations = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan) + val relations = CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan) // Target dataset must be backed by carbondata table. if (relations.length != 1) { throw new UnsupportedOperationException( diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala index aa1c05f..a072b26 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSqlParser.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.parser -import org.apache.spark.sql.{CarbonEnv, CarbonUtils, SparkSession} +import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlParser @@ -42,7 +42,7 @@ class CarbonExtensionSqlParser( parser.synchronized { CarbonEnv.getInstance(sparkSession) } - CarbonUtils.updateSessionInfoToCurrentThread(sparkSession) + CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession) try { val plan = parser.parse(sqlText) plan diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index f1d8f26..82d24f9 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.parser import scala.collection.mutable import org.antlr.v4.runtime.tree.TerminalNode -import org.apache.spark.sql.{CarbonSession, CarbonUtils, SparkSession} +import org.apache.spark.sql.{CarbonSession, CarbonThreadUtil, SparkSession} import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, SqlBaseParser} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -45,7 +45,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Sp private val substitutor = new VariableSubstitution(conf) override def parsePlan(sqlText: String): LogicalPlan = { - CarbonUtils.updateSessionInfoToCurrentThread(sparkSession) + CarbonThreadUtil.updateSessionInfoToCurrentThread(sparkSession) try { val parsedPlan = super.parsePlan(sqlText) CarbonScalaUtil.cleanParserThreadLocals diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala index dba8ff2..8a63351 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.secondaryindex.optimizer import org.apache.log4j.Logger -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonThreadUtil, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -29,6 +29,7 @@ import org.apache.spark.util.SparkUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.spark.util.CarbonSparkUtil /** * Rule for rewriting plan if query has a filter on index table column @@ -61,7 +62,7 @@ class CarbonSITransformationRule(sparkSession: SparkSession) private def checkIfRuleNeedToBeApplied(plan: LogicalPlan): Boolean = { var isRuleNeedToBeApplied = false - val relations = CarbonUtils.collectCarbonRelation(plan) + val relations = CarbonSparkUtil.collectCarbonRelation(plan) val isCreateAsSelect = isCreateTableAsSelect(plan) if (relations.nonEmpty && !isCreateAsSelect) { plan.collect { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala index 582ecfe..1d9b06b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.{CarbonMergeFilesRDD, RDD} -import org.apache.spark.sql.{functions, CarbonEnv, CarbonUtils, DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.{functions, CarbonEnv, CarbonThreadUtil, DataFrame, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.Project @@ -582,7 +582,7 @@ object SecondaryIndexCreator { projections: String, segments: Array[String]): DataFrame = { try { - CarbonUtils.threadSet( + CarbonThreadUtil.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(",")) val logicalPlan = sparkSession.sql( @@ -609,7 +609,7 @@ object SecondaryIndexCreator { tableProperties.put("isPositionIDRequested", "true") SparkSQLUtil.execute(newLogicalPlan, sparkSession) } finally { - CarbonUtils + CarbonThreadUtil .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName + CarbonCommonConstants.POINT + carbonTable.getTableName) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala index 1aeea03..07f48cd 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala @@ -23,7 +23,7 @@ import scala.concurrent.{Await, Future} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration -import org.apache.spark.sql.{CarbonUtils, Row} +import org.apache.spark.sql.{CarbonThreadUtil, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -59,31 +59,31 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA } test("test multithreading for segment reading") { - CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3") + CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3") val df = sql("select count(empno) from carbon_table_MulTI_THread") checkAnswer(df, Seq(Row(30))) val four = Future { - CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3") + CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3") val df = sql("select count(empno) from carbon_table_MulTI_THread") checkAnswer(df, Seq(Row(20))) } val three = Future { - CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2") + CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2") val df = sql("select count(empno) from carbon_table_MulTI_THread") checkAnswer(df, Seq(Row(30))) } val one = Future { - CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2") + CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2") val df = sql("select count(empno) from carbon_table_MulTI_THread") checkAnswer(df, Seq(Row(20))) } val two = Future { - CarbonUtils.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1") + CarbonThreadUtil.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1") val df = sql("select count(empno) from carbon_table_MulTI_THread") checkAnswer(df, Seq(Row(10))) } @@ -94,7 +94,7 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA override def afterAll: Unit = { sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread") - CarbonUtils.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread") + CarbonThreadUtil.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread") CarbonProperties.getInstance() .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED) } diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala index 3cee00e..078bffb 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/query/TestFilterReordering.scala @@ -17,7 +17,7 @@ package org.apache.spark.carbondata.query -import org.apache.spark.sql.{CarbonEnv, CarbonUtils} +import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.sources.{And, EqualTo, Filter, Or} import org.apache.spark.sql.test.util.QueryTest @@ -59,7 +59,7 @@ class TestFilterReordering extends QueryTest with BeforeAndAfterAll{ test("test disabling filter reordering") { sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=false") - CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession) + CarbonThreadUtil.updateSessionInfoToCurrentThread(sqlContext.sparkSession) val filter1 = Or(And(EqualTo("four", 11), EqualTo("two", 11)), EqualTo("one", 11)) val table = CarbonEnv.getCarbonTable(None, "filter_reorder")(sqlContext.sparkSession) val d: Array[Filter] = CarbonFilters.reorderFilter(Array(filter1), table) @@ -69,7 +69,7 @@ class TestFilterReordering extends QueryTest with BeforeAndAfterAll{ override protected def afterAll(): Unit = { sqlContext.sparkSession.sql(s"set ${CarbonCommonConstants.CARBON_REORDER_FILTER}=true") - CarbonUtils.updateSessionInfoToCurrentThread(sqlContext.sparkSession) + CarbonThreadUtil.updateSessionInfoToCurrentThread(sqlContext.sparkSession) sql("drop table if exists filter_reorder") } }