This is an automated email from the ASF dual-hosted git repository.
liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b4fb3f8 [CARBONDATA-3726] Add CDC and Data Dedup example
b4fb3f8 is described below
commit b4fb3f801aa74c7e83c30b4603f847ac4bb1189d
Author: Jacky Li <[email protected]>
AuthorDate: Fri Feb 28 14:39:58 2020 +0800
[CARBONDATA-3726] Add CDC and Data Dedup example
Why is this PR needed?
Change Data Capture and Data Duplication is common tasks for data
management, we should add examples for user to reference
We can leverage CarbonData's Merge API to implement these use cases
What changes were proposed in this PR?
1.Two examples are added
2.A performance improvement: earlier carbon always use 'full_outer' join to
do Merge, in this PR we decide Join type based on match condition
3.Make auditor enable dynamically, user can disable it.
The output of CDCExample as following. By default in the example, target
table has 200,000 rows and each change batch contains 9000 rows update, 1000
rows insert and 1000 rows delete
start CDC example using hive solution
generating target table...done! 10.43 s
applying change batch1...done! 5.94 s
applying change batch2...done! 3.99 s
applying change batch3...done! 3.58 s
applying change batch4...done! 3.31 s
applying change batch5...done! 3.22 s
applying change batch6...done! 4.31 s
applying change batch7...done! 5.00 s
applying change batch8...done! 4.02 s
applying change batch9...done! 5.88 s
applying change batch10...done! 5.33 s
total update takes 44.59 s
total query takes 1.33 s
start CDC example using carbon solution
generating target table...done! 10.03 s
applying change batch1...done! 4.48 s
applying change batch2...done! 2.35 s
applying change batch3...done! 2.00 s
applying change batch4...done! 1.99 s
applying change batch5...done! 1.80 s
applying change batch6...done! 1.81 s
applying change batch7...done! 1.59 s
applying change batch8...done! 1.70 s
applying change batch9...done! 1.74 s
applying change batch10...done! 1.70 s
total update takes 21.18 s
total query takes 0.81 s
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3643
---
.../core/constants/CarbonCommonConstants.java | 7 +
.../carbondata/core/util/CarbonProperties.java | 11 +
.../apache/carbondata/examples/CDCExample.scala | 332 +++++++++++++++++++++
.../apache/carbondata/examples/DedupExample.scala | 207 +++++++++++++
.../mutation/merge/CarbonMergeDataSetCommand.scala | 46 ++-
.../apache/carbondata/processing/util/Auditor.java | 33 +-
6 files changed, 615 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c020fc2..c1d7d81 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2249,6 +2249,13 @@ public final class CarbonCommonConstants {
500;
/**
+ * Set it to true to enable audit
+ */
+ public static final String CARBON_ENABLE_AUDIT = "carbon.audit.enabled";
+
+ public static final String CARBON_ENABLE_AUDIT_DEFAULT = "true";
+
+ /**
* This property will be used to store datamap name
*/
public static final String DATAMAP_NAME = "datamap_name";
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 81faba0..f4ca24e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2003,4 +2003,15 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT,
CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT_DEFAULT));
}
+
+ public static boolean isAuditEnabled() {
+ return Boolean.parseBoolean(getInstance().getProperty(
+ CarbonCommonConstants.CARBON_ENABLE_AUDIT,
+ CarbonCommonConstants.CARBON_ENABLE_AUDIT_DEFAULT
+ ));
+ }
+
+ public static void setAuditEnabled(boolean enabled) {
+ getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_AUDIT,
String.valueOf(enabled));
+ }
}
diff --git
a/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
new file mode 100644
index 0000000..cea17c4
--- /dev/null
+++
b/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.time.LocalDateTime
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.functions._
+
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.util.Auditor
+
+/**
+ * CDC (Change Data Capture) example, it reads input CSV files as Change input
file and merge
+ * it into a target CarbonData table
+ *
+ * The schema of target table:
+ * (id int, value string, remark string, mdt timestamp)
+ *
+ * The schema of change table:
+ * (id int, value string, change_type string, mdt timestamp)
+ *
+ * change_type can be I/U/D
+ *
+ * This example generate N number batch of change data and merge them into
target table
+ */
+// scalastyle:off println
+object CDCExample {
+
+ case class Target (id: Int, value: String, remark: String, mdt: String)
+
+ case class Change (id: Int, value: String, change_type: String, mdt: String)
+
+ // User can set it to "carbon" or "hive"
+ // If "carbon" is set, will use CarbonData's MERGE to perform CDC
+ // If "hive" is set, will use INSERT OVERWRITE to perform CDC
+ private val solution = "carbon"
+
+ // print result or not to console for debugging
+ private val printDetail = false
+
+ // number of records for target table before start CDC
+ private val numInitialRows = 100000
+
+ // number of records to insert for every batch
+ private val numInsertPerBatch = 1000
+
+ // number of records to update for every batch
+ private val numUpdatePerBatch = 9000
+
+ // number of records to delete for every batch
+ private val numDeletePerBatch = 1000
+
+ // number of batch to simulate CDC
+ private val numBatch = 10
+
+ private val random = new Random()
+
+ // generate 100 random strings
+ private val values =
+ (1 to 100).map { x =>
+ // to simulate a wide target table, make a relatively long string for
value
+ random.nextString(100)
+ }
+
+ // pick one value randomly
+ private def pickValue = values(random.nextInt(values.size))
+
+ // IDs in the target table
+ private val currentIds = new java.util.ArrayList[Int](numInitialRows * 2)
+ private def getId(index: Int) = currentIds.get(index)
+ private def getAndRemoveId(index: Int) = currentIds.remove(index)
+ private def addId(id: Int) = currentIds.add(id)
+ private def removeId(index: Int) = currentIds.remove(index)
+ private def numOfIds = currentIds.size
+ private def maxId: Int = currentIds.asScala.max
+
+ private val INSERT = "I"
+ private val UPDATE = "U"
+ private val DELETE = "D"
+
+ // generate change data for insert
+ private def generateRowsForInsert(sparkSession: SparkSession) = {
+ // data for insert to the target table
+ val insertRows = (maxId + 1 to maxId + numInsertPerBatch).map { x =>
+ addId(x)
+ Change(x, pickValue, INSERT, LocalDateTime.now().toString)
+ }
+ sparkSession.createDataFrame(insertRows)
+ }
+
+ // generate change data for delete
+ private def generateRowsForDelete(sparkSession: SparkSession) = {
+ val deletedRows = (1 to numDeletePerBatch).map { x =>
+ val idIndex = random.nextInt(numOfIds)
+ Change(getAndRemoveId(idIndex), "", DELETE, LocalDateTime.now().toString)
+ }
+ sparkSession.createDataFrame(deletedRows)
+ }
+
+ // generate change data for update
+ private def generateRowsForUpdate(sparkSession: SparkSession) = {
+ val updatedRows = (1 to numUpdatePerBatch).map { x =>
+ val idIndex = random.nextInt(numOfIds)
+ Change(getId(idIndex), pickValue, UPDATE, LocalDateTime.now().toString)
+ }
+ sparkSession.createDataFrame(updatedRows)
+ }
+
+ // generate initial data for target table
+ private def generateTarget(sparkSession: SparkSession): Unit = {
+ print("generating target table...")
+ val time = timeIt { () =>
+ val insertRows = (1 to numInitialRows).map { x =>
+ addId(x)
+ Target(x, pickValue, "origin", LocalDateTime.now().toString)
+ }
+ // here we insert duplicated rows to simulate non primary key table
(which has repeated id)
+ val duplicatedRow = insertRows.union(insertRows)
+ val targetData = sparkSession.createDataFrame(duplicatedRow)
+ targetData.repartition(8)
+ .write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ println(s"done! ${timeFormatted(time)}")
+ }
+
+ // generate change data
+ private def generateChange(sparkSession: SparkSession): Unit = {
+ val update = generateRowsForUpdate(sparkSession)
+ val delete = generateRowsForDelete(sparkSession)
+ val insert = generateRowsForInsert(sparkSession)
+
+ // union them so that the change contains IUD
+ update
+ .union(delete)
+ .union(insert)
+ .repartition(8)
+ .write
+ .format("carbondata")
+ .option("tableName", "change")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+
+ private def readTargetData(sparkSession: SparkSession): Dataset[Row] =
+ sparkSession.read
+ .format("carbondata")
+ .option("tableName", "target")
+ .load()
+
+ private def readChangeData(sparkSession: SparkSession): Dataset[Row] =
+ sparkSession.read
+ .format("carbondata")
+ .option("tableName", "change")
+ .load()
+
+ private def timeIt(func: () => Unit): Long = {
+ val start = System.nanoTime()
+ func()
+ System.nanoTime() - start
+ }
+
+ private def timeFormatted(updateTime: Long) = {
+ (updateTime.asInstanceOf[Double] / 1000 / 1000 / 1000).formatted("%.2f") +
" s"
+ }
+
+ private def printTarget(spark: SparkSession, i: Int) = {
+ if (printDetail) {
+ println(s"target table after CDC batch$i")
+ spark.sql("select * from target order by id").show(false)
+ }
+ }
+
+ private def printChange(spark: SparkSession, i: Int) = {
+ if (printDetail) {
+ println(s"CDC batch$i")
+ spark.sql("select * from change").show(100, false)
+ }
+ }
+
+ private def createSession = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath +
"../../../..").getCanonicalPath
+ val spark = SparkSession
+ .builder()
+ .master("local[8]")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir",
s"$rootPath/examples/spark/target/warehouse")
+ .getOrCreateCarbonSession()
+ spark
+ }
+
+ def main(args: Array[String]): Unit = {
+ CarbonProperties.setAuditEnabled(false);
+ val spark = createSession
+ spark.sparkContext.setLogLevel("error")
+
+ println(s"start CDC example using $solution solution")
+ spark.sql("drop table if exists target")
+ spark.sql("drop table if exists change")
+
+ // prepare target table
+ generateTarget(spark)
+
+ if (printDetail) {
+ println("## target table")
+ spark.sql("select * from target").show(100, false)
+ }
+
+ var updateTime = 0L
+
+ // Do CDC for N batch
+ (1 to numBatch).foreach { i =>
+ // prepare for change data
+ generateChange(spark)
+
+ printChange(spark, i)
+
+ // apply change to target table
+ val time = timeIt { () =>
+ print(s"applying change batch$i...")
+ if (solution.equals("carbon")) {
+ carbonSolution(spark)
+ } else {
+ hiveSolution(spark)
+ }
+ }
+ updateTime += time
+ println(s"done! ${timeFormatted(time)}")
+ printTarget(spark, i)
+ }
+
+ // do a query after all changes to compare query time
+ val queryTime = timeIt {
+ () => spark.sql("select * from target").collect()
+ }
+
+ // print update time
+ println(s"total update takes ${timeFormatted(updateTime)}")
+
+ // print query time
+ println(s"total query takes ${timeFormatted(queryTime)}")
+
+ spark.close()
+ }
+
+ /**
+ * Solution leveraging carbon's Merge syntax to apply change data
+ */
+ private def carbonSolution(spark: SparkSession) = {
+ import org.apache.spark.sql.CarbonSession._
+
+ // find the latest value for each key
+ val latestChangeForEachKey = readChangeData(spark)
+ .selectExpr("id", "struct(mdt, value, change_type) as otherCols" )
+ .groupBy("id")
+ .agg(max("otherCols").as("latest"))
+ .selectExpr("id", "latest.*")
+
+ val target = readTargetData(spark)
+ target.as("A")
+ .merge(latestChangeForEachKey.as("B"), "A.id = B.id")
+ .whenMatched("B.change_type = 'D'")
+ .delete()
+ .whenMatched("B.change_type = 'U'")
+ .updateExpr(
+ Map("id" -> "B.id", "value" -> "B.value", "remark" -> "'updated'",
"mdt" -> "B.mdt"))
+ .whenNotMatched("B.change_type = 'I'")
+ .insertExpr(
+ Map("id" -> "B.id", "value" -> "B.value", "remark" -> "'new'", "mdt"
-> "B.mdt"))
+ .execute()
+ }
+
+ /**
+ * Typical solution when using hive
+ * INSERT OVERWRITE to rewrite the whole table/partition for every CDC batch
+ */
+ private def hiveSolution(spark: SparkSession) = {
+ val latestChangeForEachKey = readChangeData(spark)
+ .selectExpr("id", "struct(mdt, value, change_type) as otherCols" )
+ .groupBy("id")
+ .agg(max("otherCols").as("latest"))
+ .selectExpr("id", "latest.*")
+ latestChangeForEachKey.createOrReplaceTempView("latest_change")
+ spark.sql(
+ """
+ | insert overwrite table target
+ | select * from
+ | (
+ | select A.id, B.value, 'updated', B.mdt
+ | from target A
+ | right join latest_change B
+ | on A.id = B.id
+ | where B.change_type = 'U'
+ | union all
+ | select B.id, B.value, 'new', B.mdt
+ | from latest_change B
+ | where B.change_type = 'I'
+ | union all
+ | select A.id, A.value, A.remark, A.mdt
+ | from target A
+ | left join latest_change B
+ | on A.id = B.id
+ | where B.id is null
+ | ) T
+ """.stripMargin)
+ }
+}
+// scalastyle:on println
diff --git
a/examples/spark/src/main/scala/org/apache/carbondata/examples/DedupExample.scala
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DedupExample.scala
new file mode 100644
index 0000000..8986be3
--- /dev/null
+++
b/examples/spark/src/main/scala/org/apache/carbondata/examples/DedupExample.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.time.LocalDateTime
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.functions.max
+
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.util.Auditor
+
+/**
+ * Deduplicate Example to show how to use CarbonData to avoid loading
duplicate record
+ * by using Merge syntax
+ *
+ * The schema of target table:
+ * (id int, value string, source_table string, mdt timestamp)
+ *
+ * The schema of change table:
+ * (id int, value string, change_type string, mdt timestamp)
+ *
+ * This example will insert change table into target table without duplicated
record
+ */
+// scalastyle:off println
+object DedupExample {
+
+ case class Target (id: Int, value: String, source_table: String, mdt: String)
+
+ case class Change (id: Int, value: String, change_type: String, mdt: String)
+
+ // number of records for target table before start CDC
+ val numInitialRows = 10
+
+ // number of records to insert for every batch
+ val numInsertPerBatch = 3
+
+ // number of duplicated records to update for every batch
+ val numDuplicatePerBatch = 4
+
+ // number of batch to simulate CDC
+ val numBatch = 2
+
+ // print result or not to console
+ val printDetail = true
+
+ val names = Seq("Amy", "Bob", "Lucy", "Roy", "Tony", "Mick", "Henry",
"Michael", "Carly",
+ "Emma", "Jade", "Josh", "Sue", "Ben", "Dale", "Chris", "Grace", "Emily")
+
+ private def pickName = names(Random.nextInt(names.size))
+
+ // IDs in the target table
+ private val currentIds = new java.util.ArrayList[Int]()
+ private def addId(id: Int) = currentIds.add(id)
+ private def maxId: Int = currentIds.asScala.max
+
+ private val INSERT = "I"
+
+ // generate data to insert to the target table
+ private def generateRowsForInsert(sparkSession: SparkSession) = {
+ // make some duplicated id by maxId - 2
+ val insertRows = (maxId - 2 to maxId + numInsertPerBatch).map { x =>
+ addId(x)
+ Change(x, pickName, INSERT, LocalDateTime.now().toString)
+ }
+ val insertData = sparkSession.createDataFrame(insertRows)
+
+ // make more duplicated records
+ val duplicatedData = insertData.union(insertData)
+ duplicatedData.write
+ .format("carbondata")
+ .option("tableName", "change")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+
+ // generate initial data for target table
+ private def generateTarget(sparkSession: SparkSession) = {
+ val insertRows = (1 to numInitialRows).map { x =>
+ addId(x)
+ Target(x, pickName, "table1", LocalDateTime.now().toString)
+ }
+ val targetData = sparkSession.createDataFrame(insertRows)
+ targetData.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+
+ private def readTargetData(sparkSession: SparkSession): Dataset[Row] =
+ sparkSession.read
+ .format("carbondata")
+ .option("tableName", "target")
+ .load()
+
+ private def readChangeData(sparkSession: SparkSession): Dataset[Row] =
+ sparkSession.read
+ .format("carbondata")
+ .option("tableName", "change")
+ .load()
+
+ private def printChange(spark: SparkSession, i: Int) = {
+ if (printDetail) {
+ println(s"Insert batch$i")
+ spark.sql("select * from change").show(100, false)
+ }
+ }
+
+ private def printTarget(spark: SparkSession, i: Int) = {
+ if (printDetail) {
+ println(s"target table after insert batch$i")
+ spark.sql("select * from target order by id").show(false)
+ }
+ }
+
+ private def printTarget(spark: SparkSession) = {
+ if (printDetail) {
+ println("## target table")
+ spark.sql("select * from target").show(100, false)
+ }
+ }
+
+ private def createSession = {
+ import org.apache.spark.sql.CarbonSession._
+ val rootPath = new File(this.getClass.getResource("/").getPath +
"../../../..").getCanonicalPath
+ val spark = SparkSession
+ .builder()
+ .master("local[8]")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir",
s"$rootPath/examples/spark/target/warehouse")
+ .getOrCreateCarbonSession()
+ spark.sparkContext.setLogLevel("error")
+ spark
+ }
+
+ def main(args: Array[String]): Unit = {
+ CarbonProperties.setAuditEnabled(false);
+
+ val spark: SparkSession = createSession
+
+ spark.sql("drop table if exists target")
+ spark.sql("drop table if exists change")
+
+ // prepare target data
+ generateTarget(spark)
+
+ printTarget(spark)
+
+ (1 to numBatch).foreach { i =>
+ // prepare for change data
+ generateRowsForInsert(spark)
+
+ printChange(spark, i)
+
+ // apply Change to history table by using MERGE
+ dedupAndInsert(spark)
+
+ printTarget(spark, i)
+ }
+
+ spark.close()
+ }
+
+ /**
+ * Leveraging carbon's Merge syntax to perform data deduplication
+ */
+ private def dedupAndInsert(spark: SparkSession) = {
+ import org.apache.spark.sql.CarbonSession._
+
+ // find the latest value for each key
+ val latestChangeForEachKey = readChangeData(spark)
+ .selectExpr("id", "struct(mdt, value, change_type) as otherCols" )
+ .groupBy("id")
+ .agg(max("otherCols").as("latest"))
+ .selectExpr("id", "latest.*")
+
+ val target = readTargetData(spark)
+ target.as("A")
+ .merge(latestChangeForEachKey.as("B"), "A.id = B.id")
+ .whenNotMatched()
+ .insertExpr(
+ Map("id" -> "B.id", "value" -> "B.value", "source_table" ->
"'table1'", "mdt" -> "B.mdt"))
+ .execute()
+ }
+
+}
+// scalastyle:on println
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 441c96a..b16d3c3 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -95,14 +95,16 @@ case class CarbonMergeDataSetCommand(
// Lets generate all conditions combinations as one column and add them as
'status'.
val condition = generateStatusColumnWithAllCombinations(mergeMatches)
+ // decide join type based on match conditions
+ val joinType = decideJoinType
+
// Add the tupleid udf to get the tupleid to generate delete delta.
- val frame =
targetDs.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
- expr("getTupleId()")).withColumn("exist_on_target", lit(1)).join(
- srcDS.withColumn("exist_on_src", lit(1)),
- // Do the full outer join to get the data from both sides without
missing anything.
- // TODO As per the match conditions choose the join, sometimes it might
be possible to use
- // left_outer join.
- mergeMatches.joinExpr, "full_outer").withColumn("status", condition)
+ val frame =
+ targetDs
+ .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
+ .withColumn("exist_on_target", lit(1))
+ .join(srcDS.withColumn("exist_on_src", lit(1)), mergeMatches.joinExpr,
joinType)
+ .withColumn("status", condition)
if (LOGGER.isDebugEnabled) {
frame.explain()
}
@@ -202,6 +204,36 @@ case class CarbonMergeDataSetCommand(
Seq.empty
}
+ // Decide join type based on match conditions
+ private def decideJoinType: String = {
+ if (containsWhenNotMatchedOnly) {
+ // if match condition contains WhenNotMatched only, then we do not need
+ // left table key and matched key
+ "right_outer"
+ } else if (containsWhenMatchedOnly) {
+ // if match condition contains WhenMatched only, then we need matched
key only
+ "inner"
+ } else if (needKeyFromLeftTable) {
+ // if we need to keep keys from left table, then use full outer join
+ "full_outer"
+ } else {
+ // default join type
+ "right"
+ }
+ }
+
+ private def needKeyFromLeftTable: Boolean = {
+
mergeMatches.matchList.exists(_.isInstanceOf[WhenNotMatchedAndExistsOnlyOnTarget])
+ }
+
+ private def containsWhenMatchedOnly: Boolean = {
+ mergeMatches.matchList.forall(_.isInstanceOf[WhenMatched])
+ }
+
+ private def containsWhenNotMatchedOnly: Boolean = {
+ mergeMatches.matchList.forall(_.isInstanceOf[WhenNotMatched])
+ }
+
/**
* As per the status of the row either it inserts the data or update/delete
the data.
*/
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/Auditor.java
b/processing/src/main/java/org/apache/carbondata/processing/util/Auditor.java
index 2bcd4d3..bd027d4 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/Auditor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/Auditor.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.impl.AuditLevel;
+import org.apache.carbondata.core.util.CarbonProperties;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -64,12 +65,14 @@ public class Auditor {
* @param opId operation unique id
*/
public static void logOperationStart(String opName, String opId) {
- Objects.requireNonNull(opName);
- Objects.requireNonNull(opId);
- OpStartMessage message = new OpStartMessage(opName, opId);
- Gson gson = new GsonBuilder().disableHtmlEscaping().create();
- String json = gson.toJson(message);
- LOGGER.log(AuditLevel.AUDIT, json);
+ if (CarbonProperties.isAuditEnabled()) {
+ Objects.requireNonNull(opName);
+ Objects.requireNonNull(opId);
+ OpStartMessage message = new OpStartMessage(opName, opId);
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ String json = gson.toJson(message);
+ LOGGER.log(AuditLevel.AUDIT, json);
+ }
}
/**
@@ -83,14 +86,16 @@ public class Auditor {
*/
public static void logOperationEnd(String opName, String opId, boolean
success, String table,
String opTime, Map<String, String> extraInfo) {
- Objects.requireNonNull(opName);
- Objects.requireNonNull(opId);
- Objects.requireNonNull(opTime);
- OpEndMessage message = new OpEndMessage(opName, opId, table, opTime,
- success ? OpStatus.SUCCESS : OpStatus.FAILED,
- extraInfo != null ? extraInfo : new HashMap<String, String>());
- String json = gson.toJson(message);
- LOGGER.log(AuditLevel.AUDIT, json);
+ if (CarbonProperties.isAuditEnabled()) {
+ Objects.requireNonNull(opName);
+ Objects.requireNonNull(opId);
+ Objects.requireNonNull(opTime);
+ OpEndMessage message = new OpEndMessage(opName, opId, table, opTime,
+ success ? OpStatus.SUCCESS : OpStatus.FAILED,
+ extraInfo != null ? extraInfo : new HashMap<String, String>());
+ String json = gson.toJson(message);
+ LOGGER.log(AuditLevel.AUDIT, json);
+ }
}
private enum OpStatus {