Repository: carbondata Updated Branches: refs/heads/master 4b8dc0a58 -> 46cee146d
[CARBONDATA-2379] Support SearchModeExample run in cluster 1.support SeachModeExample running in the cluster 2.change the worker hostname to hostAddress 3. support run ConcurrentQueryBenchmark with search mode 4. remove read.close, which maybe lead to JVM crash This closes #2173 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/46cee146 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/46cee146 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/46cee146 Branch: refs/heads/master Commit: 46cee146db36617ea20c7bdaa2415e9bde02f63d Parents: 4b8dc0a Author: xubo245 <[email protected]> Authored: Mon Apr 23 10:39:59 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu May 3 09:52:09 2018 +0800 ---------------------------------------------------------------------- .../benchmark/ConcurrentQueryBenchmark.scala | 78 ++++++++++++++------ .../benchmark/SimpleQueryBenchmark.scala | 6 +- .../carbondata/examples/SearchModeExample.scala | 47 +++++++++--- .../scala/org/apache/spark/rpc/Master.scala | 2 +- 4 files changed, 98 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala index a1a1428..697d13f 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala @@ -54,7 +54,8 @@ import org.apache.carbondata.spark.util.DataGenerator * --executor-memory 24g \ * --num-executors 3 \ * concurrencyTest.jar \ - * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile + * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile + * deleteFile openSearchMode storeLocation * details in initParameters method of this benchmark */ object ConcurrentQueryBenchmark { @@ -75,6 +76,10 @@ object ConcurrentQueryBenchmark { var generateFile = true // whether delete file var deleteFile = true + // open search mode, default value is false + var openSearchMode = false + // carbon store location + var storeLocation = "/tmp" val cardinalityId = 100 * 1000 * 1000 val cardinalityCity = 6 @@ -234,23 +239,24 @@ object ConcurrentQueryBenchmark { } else { null } - - val table1Time = time { - if (table1.endsWith("parquet")) { - if (generateFile) { - generateParquetTable(spark, df, table1) - } - spark.read.parquet(table1).createOrReplaceTempView(table1) - } else if (table1.endsWith("orc")) { - if (generateFile) { - generateOrcTable(spark, df, table1) - spark.read.orc(table1).createOrReplaceTempView(table1) + if (!openSearchMode) { + val table1Time = time { + if (table1.endsWith("parquet")) { + if (generateFile) { + generateParquetTable(spark, df, storeLocation + "/" + table1) + } + spark.read.parquet(storeLocation + "/" + table1).createOrReplaceTempView(table1) + } else if (table1.endsWith("orc")) { + if (generateFile) { + generateOrcTable(spark, df, table1) + spark.read.orc(table1).createOrReplaceTempView(table1) + } + } else { + sys.error("invalid table: " + table1) } - } else { - sys.error("invalid table: " + table1) } + println(s"$table1 completed, time: $table1Time sec") } - println(s"$table1 completed, time: $table1Time sec") val table2Time: Double = if (generateFile) { generateCarbonTable(spark, df, table2) @@ -417,13 +423,26 @@ object ConcurrentQueryBenchmark { */ def runTest(spark: SparkSession, table1: String, table2: String): Unit = { // run queries on parquet and carbon - runQueries(spark, table1) + if (!openSearchMode) { + runQueries(spark, table1) + } // do GC and sleep for some time before running next table System.gc() Thread.sleep(1000) System.gc() Thread.sleep(1000) runQueries(spark, table2) + if (openSearchMode) { + runQueries(spark, table2) + // start search mode (start all gRPC server) + // following queries will be run using gRPC + spark.asInstanceOf[CarbonSession].startSearchMode() + println("Open search mode:") + runQueries(spark, table2) + runQueries(spark, table2) + // stop gRPC servers + spark.asInstanceOf[CarbonSession].stopSearchMode() + } } /** @@ -468,6 +487,9 @@ object ConcurrentQueryBenchmark { } if (arr.length > 5) { runInLocal = if (arr(5).equalsIgnoreCase("true")) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + storeLocation = s"$rootPath/examples/spark2/target/store" true } else if (arr(5).equalsIgnoreCase("false")) { false @@ -493,6 +515,18 @@ object ConcurrentQueryBenchmark { throw new Exception("error parameter, should be true or false") } } + if (arr.length > 8) { + openSearchMode = if (arr(8).equalsIgnoreCase("true")) { + true + } else if (arr(8).equalsIgnoreCase("false")) { + false + } else { + throw new Exception("error parameter, should be true or false") + } + } + if (arr.length > 9) { + storeLocation = arr(9) + } } /** @@ -520,12 +554,11 @@ object ConcurrentQueryBenchmark { "\tfile path: " + path + "\trunInLocal: " + runInLocal + "\tgenerateFile: " + generateFile + - "\tdeleteFile: " + deleteFile + "\tdeleteFile: " + deleteFile + + "\topenSearchMode: " + openSearchMode + + "\tstoreLocation: " + storeLocation val spark = if (runInLocal) { - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val storeLocation = s"$rootPath/examples/spark2/target/store" SparkSession .builder() .appName(parameters) @@ -537,10 +570,9 @@ object ConcurrentQueryBenchmark { .builder() .appName(parameters) .enableHiveSupport() - .getOrCreateCarbonSession() + .getOrCreateCarbonSession(storeLocation) } spark.sparkContext.setLogLevel("ERROR") - println("\nEnvironment information:") val env = Array( "spark.master", @@ -560,10 +592,8 @@ object ConcurrentQueryBenchmark { // 2. prepareTable prepareTable(spark, table1, table2) - spark.asInstanceOf[CarbonSession].startSearchMode() // 3. runTest runTest(spark, table1, table2) - spark.asInstanceOf[CarbonSession].stopSearchMode() if (deleteFile) { CarbonUtil.deleteFoldersAndFiles(new File(table1)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala index e9c880b..ce69c66 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala @@ -314,9 +314,13 @@ object SimpleQueryBenchmark { val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" + val master = Option(System.getProperty("spark.master")) + .orElse(sys.env.get("MASTER")) + .orElse(Option("local[8]")) + val spark = SparkSession .builder() - .master("local") + .master(master.get) .enableHiveSupport() .config("spark.driver.host", "127.0.0.1") .getOrCreateCarbonSession(storeLocation) http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala index 03e724f..aeb4c29 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala @@ -22,7 +22,8 @@ import java.util.concurrent.{Executors, ExecutorService} import org.apache.spark.sql.{CarbonSession, SparkSession} -import org.apache.carbondata.examples.util.ExampleUtils +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties /** * An example that demonstrate how to run queries in search mode, @@ -32,16 +33,46 @@ import org.apache.carbondata.examples.util.ExampleUtils object SearchModeExample { def main(args: Array[String]) { - val spark = ExampleUtils.createCarbonSession("SearchModeExample") + import org.apache.spark.sql.CarbonSession._ + val master = Option(System.getProperty("spark.master")) + .orElse(sys.env.get("MASTER")) + .orElse(Option("local[8]")) + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "") + + val filePath = if (args.length > 0) { + args(0) + } else { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + s"$rootPath/examples/spark2/src/main/resources/data.csv" + } + val storePath = if (args.length > 1) { + args(1) + } else { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + s"$rootPath/examples/spark2/target/store" + } + + val spark = SparkSession + .builder() + .master(master.get) + .appName("SearchModeExample") + .config("spark.sql.crossJoin.enabled", "true") + .getOrCreateCarbonSession(storePath) + spark.sparkContext.setLogLevel("ERROR") - exampleBody(spark) + exampleBody(spark, filePath) + println("Finished!") spark.close() } - def exampleBody(spark : SparkSession): Unit = { - - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath + def exampleBody(spark: SparkSession, path: String): Unit = { spark.sql("DROP TABLE IF EXISTS carbonsession_table") @@ -64,8 +95,6 @@ object SearchModeExample { | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') """.stripMargin) - val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" - spark.sql( s""" | LOAD DATA LOCAL INPATH '$path' http://git-wip-us.apache.org/repos/asf/carbondata/blob/46cee146/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index bc44fb6..e98a780 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -210,7 +210,7 @@ class Master(sparkConf: SparkConf, port: Int) { /** * Prune data by using CarbonInputFormat.getSplit - * Return a mapping of hostname to list of block + * Return a mapping of host address to list of block */ private def pruneBlock( table: CarbonTable,
