Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3066#discussion_r246995496
--- Diff:
examples/spark2/src/main/scala/org/apache/carbondata/benchmark/CDCBenchmark.scala
---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.commons.lang3.time.DateUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+/**
+ * Benchmark for Change Data Capture scenario.
+ * This test simulates updates to history table using CDC table.
+ *
+ * The benchmark shows performance of two update methods:
+ * 1. hive_solution, which uses INSERT OVERWRITE. This is a popular method
for hive warehouse.
+ * 2. carbon_solution, which uses CarbonData's update syntax to update the
history table directly.
+ *
+ * When running in a 8-cores laptop, the benchmark shows:
+ *
+ * 1. test one
+ * History table 1M records, update 10K records everyday and insert 10K
records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 13,516 ms
+ * carbon_solution: total process time takes 7,521 ms
+ *
+ *
+ * 2. test two
+ * History table 10M records, update 10K records everyday and insert 10K
records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 104,250 ms
+ * carbon_solution: total process time takes 17,384 ms
+ *
+ */
+object CDCBenchmark {
+
+ // Schema for history table
+ // Table name: dw_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | start_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | end_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class Order (order_id: String, customer_id: String, start_date:
Date, end_date: Date,
+ state: Int)
+
+ // Schema for CDC data which is used for update to history table every
day
+ // Table name: ods_order
+ // +-------------+-----------+-------------+
+ // | Column name | Data type | Cardinality |
+ // +-------------+-----------+-------------+
+ // | order_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | customer_id | string | 10,000,000 |
+ // +-------------+-----------+-------------+
+ // | update_date | date | NA |
+ // +-------------+-----------+-------------+
+ // | state | int | 4 |
+ // +-------------+-----------+-------------+
+ case class CDC (order_id: String, customer_id: String, update_date:
Date, state: Int)
+
+ // number of records for first day
+ val numOrders = 10000000
+
+ // number of records to update every day
+ val numUpdateOrdersDaily = 10000
+
+ // number of new records to insert every day
+ val newNewOrdersDaily = 10000
+
+ // number of days to simulate
+ val numDays = 3
+
+ // print eveyday result or not to console
+ val printDetail = false
+
+ def generateDataForDay0(
+ sparkSession: SparkSession,
+ numOrders: Int = 1000000,
+ startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
+ import sparkSession.implicits._
+ sparkSession.sparkContext.parallelize(1 to numOrders, 4)
+ .map { x => Order(s"order$x", s"customer$x", startDate,
Date.valueOf("9999-01-01"), 1)
+ }.toDS().toDF()
+ }
+
+ def generateDailyCDC(
+ sparkSession: SparkSession,
+ numUpdatedOrders: Int,
+ startDate: Date,
+ updateDate: Date,
+ newState: Int,
+ numNewOrders: Int
+ ): DataFrame = {
+ import sparkSession.implicits._
+ val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders,
4)
+ .map {x => CDC(s"order$x", s"customer$x", updateDate, newState)
+ }.toDS().toDF()
+ val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
+ .map {x => CDC(s"newOrder${System.currentTimeMillis()}",
s"customer$x", updateDate, 1)
+ }.toDS().toDF()
+ ds1.union(ds2)
+ }
+
+ def main(args: Array[String]): Unit = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val master = Option(System.getProperty("spark.master"))
+ .orElse(sys.env.get("MASTER"))
+ .orElse(Option("local[8]"))
+
+ val spark = SparkSession
+ .builder()
+ .master(master.get)
+ .enableHiveSupport()
+ .config("spark.driver.host", "127.0.0.1")
+ .getOrCreateCarbonSession(storeLocation)
+ spark.sparkContext.setLogLevel("warn")
+
+ spark.sql("drop table if exists dw_order")
+ spark.sql("drop table if exists ods_order")
+
+ // prepare base data for first day
+ val df = generateDataForDay0(
+ sparkSession = spark,
+ numOrders = numOrders,
+ startDate = Date.valueOf("2018-05-01"))
+
+ spark.sql(s"drop table if exists dw_order")
+ df.write
+ .format("carbondata")
+ .option("tableName", "dw_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ var startDate = Date.valueOf("2018-05-01")
+ var state = 2
+ var updateTime = 0L
+
+ if (printDetail) {
+ println("## day0")
+ spark.sql("select * from dw_order").show(100, false)
+ }
+
+ for (i <- 1 to numDays) {
+ // prepare for incremental update data for day-i
+ val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
+ val cdc = generateDailyCDC(
+ sparkSession = spark,
+ numUpdatedOrders = numUpdateOrdersDaily,
+ startDate = startDate,
+ updateDate = newDate,
+ newState = state,
+ numNewOrders = newNewOrdersDaily)
+ cdc.write
+ .format("carbondata")
+ .option("tableName", "ods_order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ if (printDetail) {
+ println(s"day$i CDC")
+ spark.sql("select * from ods_order").show(100, false)
+ }
+
+ // update dw table using CDC data
+ val start = System.nanoTime()
+ hive_solution(spark)
+ // carbon_solution(spark)
--- End diff --
better to add a variable to run hive_solution or carbon_solution
separatelyï¼ using comment is confused to others
---