This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3be05d2 [CARBONDATA-4296]: schema evolution, enforcement and
deduplication utilities added
3be05d2 is described below
commit 3be05d2a44d805cf763df05cbeacce2d90a44da0
Author: pratyakshsharma <[email protected]>
AuthorDate: Wed Oct 27 13:54:37 2021 +0530
[CARBONDATA-4296]: schema evolution, enforcement and deduplication
utilities added
Why is this PR needed?
This PR adds schema enforcement, schema evolution and deduplication
capabilities for
carbondata streamer tool specifically. For the existing IUD scenarios, some
work
needs to be done to handle it completely, for example -
1. passing default values and storing them in table properties.
Changes proposed for the phase 2 -
1. Handling delete use cases with upsert operation/command itself. Right
now we
consider update as delete + insert. With the new streamer tool, it is
possible that
user sets upsert as the operation type and incoming stream has delete
records as well.
What changes were proposed in this PR?
Configs and utility methods are added for the following use cases -
1. Schema enforcement
2. Schema evolution - add column, delete column, data type change scenario
3. Deduplicate the incoming dataset against incoming dataset itself. This
is useful
in scenarios where incoming stream of data has multiple updates for the
same record
and we want to pick the latest.
4. Deduplicate the incoming dataset against existing target dataset. This
is useful
when operation type is set as INSERT and user does not want to insert
duplicate records.
This closes #4227
---
.../exceptions/sql/CarbonSchemaException.java | 39 +++
.../core/constants/CarbonCommonConstants.java | 54 +++
.../mutation/merge/CarbonMergeDataSetCommand.scala | 80 ++++-
.../mutation/merge/CarbonMergeDataSetUtil.scala | 390 ++++++++++++++++++++-
.../spark/sql/execution/strategy/DDLHelper.scala | 97 +++--
.../spark/testsuite/merge/MergeTestCase.scala | 264 +++++++++++---
6 files changed, 824 insertions(+), 100 deletions(-)
diff --git
a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/CarbonSchemaException.java
b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/CarbonSchemaException.java
new file mode 100644
index 0000000..522f132
--- /dev/null
+++
b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/CarbonSchemaException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
[email protected]
[email protected]
+public class CarbonSchemaException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String message;
+
+ public CarbonSchemaException(String message) {
+ super(message);
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return this.message;
+ }
+}
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index d72d6c1..f24108a 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2681,4 +2681,58 @@ public final class CarbonCommonConstants {
public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT =
"false";
+
//////////////////////////////////////////////////////////////////////////////////////////
+ // CDC streamer configs start here
+
//////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Name of the field from source schema whose value can be used for picking
the latest updates for
+ * a particular record in the incoming batch in case of duplicates record
keys. Useful if the
+ * write operation type is UPDATE or UPSERT. This will be used only if
+ * carbon.streamer.upsert.deduplicate is enabled.
+ */
+ @CarbonProperty public static final String
CARBON_STREAMER_SOURCE_ORDERING_FIELD =
+ "carbon.streamer.source.ordering.field";
+
+ public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT =
"";
+
+ /**
+ * This property specifies if the incoming batch needs to be deduplicated in
case of INSERT
+ * operation type. If set to true, the incoming batch will be deduplicated
against the existing
+ * data in the target carbondata table.
+ */
+ @CarbonProperty public static final String
CARBON_STREAMER_INSERT_DEDUPLICATE =
+ "carbon.streamer.insert.deduplicate";
+
+ public static final String CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT =
"false";
+
+ /**
+ * This property specifies if the incoming batch needs to be deduplicated
(when multiple updates
+ * for the same record key are present in the incoming batch) in case of
UPSERT/UPDATE operation
+ * type. If set to true, the user needs to provide proper value for the
source ordering field as
+ * well.
+ */
+ @CarbonProperty public static final String
CARBON_STREAMER_UPSERT_DEDUPLICATE =
+ "carbon.streamer.upsert.deduplicate";
+
+ public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT =
"true";
+
+ /**
+ * The metadata columns coming from the source stream data, which should not
be included in the
+ * target data.
+ */
+ @CarbonProperty public static final String CARBON_STREAMER_META_COLUMNS =
+ "carbon.streamer.meta.columns";
+
+ /**
+ * This flag decides if table schema needs to change as per the incoming
batch schema.
+ * If set to true, incoming schema will be validated with existing table
schema.
+ * If the schema has evolved, the incoming batch cannot be ingested and
+ * job will simply fail.
+ */
+ @CarbonProperty public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT =
+ "carbon.enable.schema.enforcement";
+
+ public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";
+
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index d29b531..c16e901 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID,
TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, CarbonToSparkAdapter, Column,
DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv,
CarbonToSparkAdapter, Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.avro.AvroFileFormatFactory
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -43,6 +43,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType,
StructField, StructT
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata,
LongAccumulator}
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -98,8 +99,35 @@ case class CarbonMergeDataSetCommand(
throw new UnsupportedOperationException(
"Carbon table supposed to be present in merge dataset")
}
+
+ val properties = CarbonProperties.getInstance()
+ if (operationType != null) {
+ val filterDupes = properties
+ .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+ val isSchemaEnforcementEnabled = properties
+ .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+ if (
+
!MergeOperationType.withName(operationType.toUpperCase).equals(MergeOperationType.INSERT)
&&
+ filterDupes) {
+ throw new MalformedCarbonCommandException("property
CARBON_STREAMER_INSERT_DEDUPLICATE" +
+ " should only be set with
operation type INSERT")
+ }
+ if (isSchemaEnforcementEnabled) {
+ // call the util function to verify if incoming schema matches with
target schema
+ CarbonMergeDataSetUtil.verifySourceAndTargetSchemas(targetDsOri, srcDS)
+ } else {
+ CarbonMergeDataSetUtil.handleSchemaEvolution(
+ targetDsOri, srcDS, sparkSession)
+ }
+ }
+
// Target dataset must be backed by carbondata table.
- val targetCarbonTable = relations.head.carbonRelation.carbonTable
+ val tgtTable = relations.head.carbonRelation.carbonTable
+ val targetCarbonTable: CarbonTable =
CarbonEnv.getCarbonTable(Option(tgtTable.getDatabaseName),
+ tgtTable.getTableName)(sparkSession)
+
// select only the required columns, it can avoid lot of and shuffling.
val targetDs = if (mergeMatches == null && operationType != null) {
targetDsOri.select(keyColumn)
@@ -149,11 +177,29 @@ case class CarbonMergeDataSetCommand(
joinColumns.map(srcDS.col): _*)
} else {
srcDS
- }
+ }
+
+ // deduplicate the incoming dataset
+ // TODO: handle the case for partial updates
+ val orderingField = properties.getProperty(
+ CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD,
+ CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT)
+ val deduplicatedSrcDs = if (keyColumn != null) {
+ CarbonMergeDataSetUtil.deduplicateBeforeWriting(repartitionedSrcDs,
+ targetDs,
+ sparkSession,
+ sourceAliasName,
+ targetDsAliasName,
+ keyColumn,
+ orderingField,
+ targetCarbonTable)
+ } else {
+ repartitionedSrcDs
+ }
// cache the source data as we will be scanning multiple times
- repartitionedSrcDs.cache()
- val deDuplicatedRecords = repartitionedSrcDs.count()
+ deduplicatedSrcDs.cache()
+ val deDuplicatedRecords = deduplicatedSrcDs.count()
LOGGER.info(s"Number of records from source data: $deDuplicatedRecords")
// Create accumulators to log the stats
val stats = Stats(createLongAccumulator("insertedRows"),
@@ -221,7 +267,7 @@ case class CarbonMergeDataSetCommand(
new util.LinkedHashMap[String, util.List[FilePathMinMaxVO]]
val colToSplitsFilePathAndMinMaxMap: mutable.Map[String,
util.List[FilePathMinMaxVO]] =
CarbonMergeDataSetUtil.getSplitsAndLoadToCache(targetCarbonTable,
- repartitionedSrcDs,
+ deduplicatedSrcDs,
columnMinMaxInBlocklet,
columnToIndexMap,
sparkSession)
@@ -281,7 +327,7 @@ case class CarbonMergeDataSetCommand(
// find the file paths to scan.
finalCarbonFilesToScan =
CarbonMergeDataSetUtil.getFilesToScan(joinCarbonColumns,
joinColumnToTreeMapping,
- repartitionedSrcDs)
+ deduplicatedSrcDs)
LOGGER.info(s"Finished min-max pruning. Carbondata files to scan
during merge is: ${
finalCarbonFilesToScan.length}")
@@ -298,7 +344,7 @@ case class CarbonMergeDataSetCommand(
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
- .join(repartitionedSrcDs.select(keyColumn),
+ .join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn =
$sourceAliasName.$keyColumn"),
joinType)
} else {
@@ -308,7 +354,7 @@ case class CarbonMergeDataSetCommand(
if (!isInsertOperation) {
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
- .join(repartitionedSrcDs.select(keyColumn),
+ .join(deduplicatedSrcDs.select(keyColumn),
expr(s"$targetDsAliasName.$keyColumn =
$sourceAliasName.$keyColumn"),
joinType)
} else {
@@ -318,13 +364,13 @@ case class CarbonMergeDataSetCommand(
val mergeHandler: MergeHandler =
MergeOperationType.withName(operationType.toUpperCase) match {
case MergeOperationType.UPSERT =>
- UpsertHandler(sparkSession, frame, targetCarbonTable, stats,
repartitionedSrcDs)
+ UpsertHandler(sparkSession, frame, targetCarbonTable, stats,
deduplicatedSrcDs)
case MergeOperationType.UPDATE =>
- UpdateHandler(sparkSession, frame, targetCarbonTable, stats,
repartitionedSrcDs)
+ UpdateHandler(sparkSession, frame, targetCarbonTable, stats,
deduplicatedSrcDs)
case MergeOperationType.DELETE =>
- DeleteHandler(sparkSession, frame, targetCarbonTable, stats,
repartitionedSrcDs)
+ DeleteHandler(sparkSession, frame, targetCarbonTable, stats,
deduplicatedSrcDs)
case MergeOperationType.INSERT =>
- InsertHandler(sparkSession, frame, targetCarbonTable, stats,
repartitionedSrcDs)
+ InsertHandler(sparkSession, frame, targetCarbonTable, stats,
deduplicatedSrcDs)
}
// execute merge handler
@@ -332,7 +378,7 @@ case class CarbonMergeDataSetCommand(
LOGGER.info(
" Time taken to merge data :: " + (System.currentTimeMillis() - st))
// clear the cached src
- repartitionedSrcDs.unpersist()
+ deduplicatedSrcDs.unpersist()
return Seq()
}
// validate the merge matches and actions.
@@ -354,7 +400,7 @@ case class CarbonMergeDataSetCommand(
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.where(s"getBlockPaths('${finalCarbonFilesToScan.mkString(",")}')")
- .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
+ .join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
@@ -362,7 +408,7 @@ case class CarbonMergeDataSetCommand(
targetDs
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
- .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
+ .join(deduplicatedSrcDs.withColumn("exist_on_src", lit(1)),
mergeMatches.joinExpr,
joinType)
.withColumn(status_on_mergeds, condition)
@@ -451,7 +497,7 @@ case class CarbonMergeDataSetCommand(
HorizontalCompaction.tryHorizontalCompaction(
sparkSession, targetCarbonTable)
// clear the cached src
- repartitionedSrcDs.unpersist()
+ deduplicatedSrcDs.unpersist()
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
index d719152..23a1227 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
@@ -16,23 +16,34 @@
*/
package org.apache.spark.sql.execution.command.mutation.merge
+import java.nio.charset.Charset
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row,
SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
+import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.EqualTo
import org.apache.spark.sql.execution.CastExpressionOptimization
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,
AlterTableDataTypeChangeModel, AlterTableDropColumnModel}
+import
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
CarbonAlterTableColRenameDataTypeChangeCommand,
CarbonAlterTableDropColumnCommand}
+import org.apache.spark.sql.execution.strategy.DDLHelper
+import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.DateType
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.{DateType, DecimalType, StructField}
+import org.apache.carbondata.common.exceptions.sql.CarbonSchemaException
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.index.{IndexChooser, IndexInputFormat,
IndexStoreManager, IndexUtil}
import org.apache.carbondata.core.indexstore.PartitionSpec
import
org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexRowIndexes
+import org.apache.carbondata.core.metadata.datatype.{DataType =>
CarbonDataType}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
@@ -40,7 +51,7 @@ import org.apache.carbondata.core.mutate.{CdcVO,
FilePathMinMaxVO}
import org.apache.carbondata.core.range.BlockMinMaxTree
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties,
CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.core.util.comparator.SerializableComparator
+import org.apache.carbondata.core.util.comparator.{Comparator,
SerializableComparator}
import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -49,6 +60,8 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
*/
object CarbonMergeDataSetUtil {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
/**
* This method reads the splits and make (blockPath, (min, max)) tuple to to
min max pruning of
* the src dataset
@@ -462,4 +475,375 @@ object CarbonMergeDataSetUtil {
columnMinMaxInBlocklet.asScala
}
}
+
+ /**
+ * This method verifies source and target schemas for the following:
+ * If additional columns are present in source schema as compared to target,
simply ignore them.
+ * If some columns are missing in source schema as compared to target
schema, exception is thrown.
+ * If data type of some column differs in source and target schemas,
exception is thrown.
+ * If source schema has multiple columns whose names differ only in case
sensitivity, exception
+ * is thrown.
+ *
+ * @param targetDs target carbondata table
+ * @param srcDs source/incoming data
+ */
+ def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs:
Dataset[Row]): Unit = {
+ LOGGER.info("schema enforcement is enabled. Source and target schemas will
be verified")
+ // get the source and target dataset schema
+ val sourceSchema = srcDs.schema
+ val targetSchema = targetDs.schema
+
+ verifyBackwardsCompatibility(targetDs, srcDs)
+
+ val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+
+ // check if some additional column got added in source schema
+ if (sourceSchema.fields.length > targetSchema.fields.length) {
+ val tgtSchemaInLowerCase = targetSchema.fields.map(_.name.toLowerCase)
+ val additionalSourceFields = lowerCaseSrcSchemaFields
+ .filterNot(srcField => {
+ tgtSchemaInLowerCase.contains(srcField)
+ })
+ if (additionalSourceFields.nonEmpty) {
+ LOGGER.warn(s"source schema contains additional fields which are not
present in " +
+ s"target schema: ${ additionalSourceFields.mkString(",")
}")
+ }
+ }
+ }
+
+ def verifyCaseSensitiveFieldNames(
+ lowerCaseSrcSchemaFields: Array[String]
+ ): Unit = {
+ // check if source schema has fields whose names only differ in case
sensitivity
+ val similarFields = lowerCaseSrcSchemaFields.groupBy(a => identity(a)).map
{
+ case (str, times) => (str, times.length)
+ }.toList.filter(e => e._2 > 1).map(_._1)
+ if (similarFields.nonEmpty) {
+ val errorMsg = s"source schema has similar fields which differ only in
case sensitivity: " +
+ s"${ similarFields.mkString(",") }"
+ LOGGER.error(errorMsg)
+ throw new CarbonSchemaException(errorMsg)
+ }
+ }
+
+ /**
+ * This method takes care of handling schema evolution scenarios for
CarbonStreamer class.
+ * Currently only addition of columns is supported.
+ *
+ * @param targetDs target dataset whose schema needs to be modified, if
applicable
+ * @param srcDs incoming dataset
+ * @param sparkSession SparkSession
+ */
+ def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs:
Dataset[Row],
+ sparkSession: SparkSession): Unit = {
+ // read the property here
+ val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+ if (isSchemaEnforcementEnabled) {
+ verifySourceAndTargetSchemas(targetDs, srcDs)
+ } else {
+ // These meta columns should be removed before actually writing the data
+ val metaColumnsString = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+ val metaCols = metaColumnsString.split(",").map(_.trim)
+ val srcDsWithoutMeta = if (metaCols.length > 0) {
+ srcDs.drop(metaCols: _*)
+ } else {
+ srcDs
+ }
+ handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession,
isStreamerInvolved = true)
+ }
+ }
+
+ def verifyBackwardsCompatibility(
+ targetDs: Dataset[Row],
+ srcDs: Dataset[Row]): Unit = {
+ val sourceSchema = srcDs.schema
+ val targetSchema = targetDs.schema
+
+ targetSchema.fields.foreach(tgtField => {
+ val sourceField = sourceSchema.fields
+ .find(f => f.name.equalsIgnoreCase(tgtField.name))
+ // check if some field is missing in source schema
+ if (sourceField.isEmpty) {
+ val errorMsg = s"source schema does not contain field: ${
tgtField.name }"
+ LOGGER.error(errorMsg)
+ throw new CarbonSchemaException(errorMsg)
+ }
+
+ // check if data type got modified for some column
+ if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+ val errorMsg = s"source schema has different data type " +
+ s"for field: ${ tgtField.name }"
+ LOGGER.error(errorMsg + s", source type: ${ sourceField.get.dataType
}, " +
+ s"target type: ${ tgtField.dataType }")
+ throw new CarbonSchemaException(errorMsg)
+ }
+ })
+
+ val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+ verifyCaseSensitiveFieldNames(lowerCaseSrcSchemaFields)
+ }
+
+ /**
+ * The method takes care of following schema evolution cases:
+ * Addition of a new column in source schema which is not present in target
+ * Deletion of a column in source schema which is present in target
+ * Data type changes for an existing column.
+ * The method does not take care of column renames and table renames
+ *
+ * @param targetDs existing target dataset
+ * @param srcDs incoming source dataset
+ * @return new target schema to write the incoming batch with
+ */
+ def handleSchemaEvolution(
+ targetDs: Dataset[Row],
+ srcDs: Dataset[Row],
+ sparkSession: SparkSession,
+ isStreamerInvolved: Boolean = false): Unit = {
+
+ /*
+ If the method is called from CarbonStreamer, we need to ensure the schema
is evolved in
+ backwards compatible way. In phase 1, only addition of columns is
supported, hence this check is
+ needed to ensure data integrity.
+ The existing IUD flow supports full schema evolution, hence this check is
not needed for
+ existing flows.
+ */
+ if (isStreamerInvolved) {
+ verifyBackwardsCompatibility(targetDs, srcDs)
+ }
+ val sourceSchema = srcDs.schema
+ val targetSchema = targetDs.schema
+
+ // check if any column got added in source
+ val addedColumns = sourceSchema.fields
+ .filterNot(field => targetSchema.fields.map(_.name).contains(field.name))
+ val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+ val targetCarbonTable = relations.head.carbonRelation.carbonTable
+ if (addedColumns.nonEmpty) {
+ handleAddColumnScenario(targetDs,
+ sourceSchema.fields.filter(f => addedColumns.contains(f)).toSeq,
+ sparkSession,
+ targetCarbonTable)
+ }
+
+ // check if any column got deleted from source
+ val partitionInfo = targetCarbonTable.getPartitionInfo
+ val partitionColumns = if (partitionInfo != null) {
+ partitionInfo.getColumnSchemaList.asScala
+ .map(_.getColumnName).toList
+ } else {
+ List[String]()
+ }
+ val srcSchemaFieldsInLowerCase =
sourceSchema.fields.map(_.name.toLowerCase)
+ val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+ .filterNot(f => {
+ srcSchemaFieldsInLowerCase.contains(f) ||
+ partitionColumns.contains(f)
+ })
+ if (deletedColumns.nonEmpty) {
+ handleDeleteColumnScenario(targetDs, deletedColumns.toList,
sparkSession, targetCarbonTable)
+ }
+
+ val modifiedColumns = targetSchema.fields.filter(tgtField => {
+ val sourceField = sourceSchema.fields.find(f =>
f.name.equalsIgnoreCase(tgtField.name))
+ if (sourceField.isDefined) {
+ !sourceField.get.dataType.equals(tgtField.dataType)
+ } else {
+ false
+ }
+ })
+
+ if (modifiedColumns.nonEmpty) {
+ handleDataTypeChangeScenario(targetDs,
+ modifiedColumns.toList,
+ sparkSession,
+ targetCarbonTable)
+ }
+ }
+
+ /**
+ * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+ *
+ * @param targetDs target dataset whose schema needs to be modified
+ * @param colsToAdd new columns to be added
+ * @param sparkSession SparkSession
+ */
+ def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd:
Seq[StructField],
+ sparkSession: SparkSession,
+ targetCarbonTable: CarbonTable): Unit = {
+ val alterTableAddColsCmd = DDLHelper.prepareAlterTableAddColsCommand(
+ Option(targetCarbonTable.getDatabaseName),
+ colsToAdd,
+ targetCarbonTable.getTableName.toLowerCase)
+ alterTableAddColsCmd.run(sparkSession)
+ }
+
+ /**
+ * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+ *
+ * @param targetDs target dataset whose schema needs to be modified
+ * @param colsToDrop columns to be dropped from carbondata table
+ * @param sparkSession SparkSession
+ */
+ def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop:
List[String],
+ sparkSession: SparkSession,
+ targetCarbonTable: CarbonTable): Unit = {
+ val alterTableDropColumnModel = AlterTableDropColumnModel(
+
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+ targetCarbonTable.getTableName.toLowerCase,
+ colsToDrop.map(_.toLowerCase))
+
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+ }
+
+ /**
+ * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for
handling data type changes
+ *
+ * @param targetDs target dataset whose schema needs to be modified
+ * @param modifiedCols columns with data type changes
+ * @param sparkSession SparkSession
+ */
+ def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols:
List[StructField],
+ sparkSession: SparkSession,
+ targetCarbonTable: CarbonTable): Unit = {
+ // need to call the command one by one for each modified column
+ modifiedCols.foreach(col => {
+ val alterTableColRenameDataTypeChangeCommand = DDLHelper
+ .prepareAlterTableColRenameDataTypeChangeCommand(
+ col,
+ Option(targetCarbonTable.getDatabaseName.toLowerCase),
+ targetCarbonTable.getTableName.toLowerCase,
+ col.name.toLowerCase,
+ isColumnRename = false,
+ Option.empty)
+ alterTableColRenameDataTypeChangeCommand.run(sparkSession)
+ })
+ }
+
+ def deduplicateBeforeWriting(
+ srcDs: Dataset[Row],
+ targetDs: Dataset[Row],
+ sparkSession: SparkSession,
+ srcAlias: String,
+ targetAlias: String,
+ keyColumn: String,
+ orderingField: String,
+ targetCarbonTable: CarbonTable): Dataset[Row] = {
+ val properties = CarbonProperties.getInstance()
+ val filterDupes = properties
+ .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+ val combineBeforeUpsert = properties
+ .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+ var dedupedDataset: Dataset[Row] = srcDs
+ if (combineBeforeUpsert) {
+ dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession,
srcAlias, keyColumn,
+ orderingField, targetCarbonTable)
+ }
+ if (filterDupes) {
+ dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset,
targetDs,
+ srcAlias, targetAlias, keyColumn)
+ }
+ dedupedDataset
+ }
+
+ def deduplicateAgainstIncomingDataset(
+ srcDs: Dataset[Row],
+ sparkSession: SparkSession,
+ srcAlias: String,
+ keyColumn: String,
+ orderingField: String,
+ table: CarbonTable): Dataset[Row] = {
+ if
(orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT))
{
+ return srcDs
+ }
+ val schema = srcDs.schema
+ val carbonKeyColumn = table.getColumnByName(keyColumn)
+ val keyColumnDataType = getCarbonDataType(keyColumn, srcDs)
+ val orderingFieldDataType = getCarbonDataType(orderingField, srcDs)
+ val isPrimitiveAndNotDate =
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+ (orderingFieldDataType != DataTypes.DATE)
+ val comparator = Comparator.getComparator(orderingFieldDataType)
+ val rdd = srcDs.rdd
+ val dedupedRDD: RDD[Row] = rdd.map { row =>
+ val index = row.fieldIndex(keyColumn)
+ val rowKey = getRowKey(row, index, carbonKeyColumn,
isPrimitiveAndNotDate, keyColumnDataType)
+ (rowKey, row)
+ }.reduceByKey { (row1, row2) =>
+ var orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any]
+ var orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any]
+ if (orderingValue1 == null) {
+ row2
+ } else if (orderingValue2 == null) {
+ row1
+ } else {
+ if (orderingFieldDataType.equals(DataTypes.STRING)) {
+ orderingValue1 = orderingValue1.toString
+ .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+ orderingValue2 = orderingValue2.toString
+ .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+ }
+ if (comparator.compare(orderingValue1, orderingValue2) >= 0) {
+ row1
+ } else {
+ row2
+ }
+ }
+ }.map(_._2)
+ sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias)
+ }
+
+ def getRowKey(
+ row: Row,
+ index: Integer,
+ carbonKeyColumn: CarbonColumn,
+ isPrimitiveAndNotDate: Boolean,
+ keyColumnDataType: CarbonDataType
+ ): AnyRef = {
+ if (!row.isNullAt(index)) {
+ row.getAs(index).toString
+ } else {
+ val value: Long = 0
+ if (carbonKeyColumn.isDimension) {
+ if (isPrimitiveAndNotDate) {
+ CarbonCommonConstants.EMPTY_BYTE_ARRAY
+ } else {
+ CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY
+ }
+ } else {
+ val nullValueForMeasure = keyColumnDataType match {
+ case DataTypes.BOOLEAN | DataTypes.BYTE => value.toByte
+ case DataTypes.SHORT => value.toShort
+ case DataTypes.INT => value.toInt
+ case DataTypes.DOUBLE => 0d
+ case DataTypes.FLOAT => 0f
+ case DataTypes.LONG | DataTypes.TIMESTAMP => value
+ case _ => value
+ }
+ CarbonUtil.getValueAsBytes(keyColumnDataType, nullValueForMeasure)
+ }
+ }
+ }
+
+ def getCarbonDataType(
+ fieldName: String,
+ srcDs: Dataset[Row]
+ ): CarbonDataType = {
+ val schema = srcDs.schema
+ val dataType = schema.fields.find(f =>
f.name.equalsIgnoreCase(fieldName)).get.dataType
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)
+ }
+
+ def deduplicateAgainstExistingDataset(
+ srcDs: Dataset[Row],
+ targetDs: Dataset[Row],
+ srcAlias: String,
+ targetAlias: String,
+ keyColumn: String
+ ): Dataset[Row] = {
+ srcDs.join(targetDs,
+ expr(s"$srcAlias.$keyColumn = $targetAlias.$keyColumn"), "left_anti")
+ }
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
index fa9615d..1839820 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.table._
import org.apache.spark.sql.execution.datasources.{LogicalRelation,
RefreshResource}
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
import org.apache.spark.sql.parser.{CarbonSpark2SqlParser,
CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{DecimalType, StructField}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.CarbonReflectionUtils
@@ -185,26 +185,33 @@ object DDLHelper {
throw new MalformedCarbonCommandException(
"Unsupported operation on non transactional table")
} else {
- val fields = new
CarbonSpark2SqlParser().getFields(addColumnsCommand.colsToAdd)
- val tableModel = CarbonParserUtil.prepareTableModel(false,
- CarbonParserUtil.convertDbNameToLowerCase(table.database),
- table.table.toLowerCase,
- fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
- Seq.empty,
- scala.collection.mutable.Map.empty[String, String],
- None,
- true)
- val alterTableAddColumnsModel = AlterTableAddColumnsModel(
- CarbonParserUtil.convertDbNameToLowerCase(table.database),
- table.table.toLowerCase,
- Map.empty[String, String],
- tableModel.dimCols,
- tableModel.msrCols,
- tableModel.highCardinalityDims.getOrElse(Seq.empty))
- CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+ prepareAlterTableAddColsCommand(table.database,
addColumnsCommand.colsToAdd,
+ table.table.toLowerCase)
}
}
+ def prepareAlterTableAddColsCommand(dbName: Option[String],
+ colsToAdd: Seq[StructField],
+ tableName: String): CarbonAlterTableAddColumnCommand = {
+ val fields = new CarbonSpark2SqlParser().getFields(colsToAdd)
+ val tableModel = CarbonParserUtil.prepareTableModel(ifNotExistPresent =
false,
+ CarbonParserUtil.convertDbNameToLowerCase(dbName),
+ tableName,
+ fields.map(CarbonParserUtil.convertFieldNamesToLowercase),
+ Seq.empty,
+ scala.collection.mutable.Map.empty[String, String],
+ None,
+ isAlterFlow = true)
+ val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+ CarbonParserUtil.convertDbNameToLowerCase(dbName),
+ tableName,
+ Map.empty[String, String],
+ tableModel.dimCols,
+ tableModel.msrCols,
+ tableModel.highCardinalityDims.getOrElse(Seq.empty))
+ CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+ }
+
def changeColumn(
changeColumnCommand: AlterTableChangeColumnCommand,
sparkSession: SparkSession):
CarbonAlterTableColRenameDataTypeChangeCommand = {
@@ -223,32 +230,48 @@ object DDLHelper {
val newColumn = changeColumnCommand.newColumn
val newColumnMetaData = newColumn.metadata
val isColumnRename = !columnName.equalsIgnoreCase(newColumn.name)
- val values = newColumn.dataType match {
- case d: DecimalType => Some(List((d.precision, d.scale)))
- case _ => None
- }
- val dataTypeInfo = CarbonParserUtil.parseColumn(newColumn.name,
newColumn.dataType, values)
var newColumnComment: Option[String] = Option.empty
if (newColumnMetaData != null &&
- newColumnMetaData.contains(CarbonCommonConstants.COLUMN_COMMENT)) {
+ newColumnMetaData.contains(CarbonCommonConstants.COLUMN_COMMENT)) {
newColumnComment =
Some(newColumnMetaData.getString(CarbonCommonConstants.COLUMN_COMMENT))
}
- val alterTableColRenameAndDataTypeChangeModel =
- AlterTableDataTypeChangeModel(
- dataTypeInfo,
- tableName.database.map(_.toLowerCase),
- tableName.table.toLowerCase,
- columnName.toLowerCase,
- newColumn.name.toLowerCase,
- isColumnRename,
- newColumnComment)
-
- CarbonAlterTableColRenameDataTypeChangeCommand(
- alterTableColRenameAndDataTypeChangeModel
- )
+ prepareAlterTableColRenameDataTypeChangeCommand(newColumn,
+ tableName.database.map(_.toLowerCase),
+ tableName.table.toLowerCase,
+ columnName.toLowerCase,
+ isColumnRename,
+ newColumnComment)
+ }
+ }
+
+ def prepareAlterTableColRenameDataTypeChangeCommand(
+ newColumn: StructField,
+ dbName: Option[String],
+ tableName: String,
+ columnName: String,
+ isColumnRename: Boolean,
+ newColumnComment: Option[String]
+ ): CarbonAlterTableColRenameDataTypeChangeCommand = {
+ val values = newColumn.dataType match {
+ case d: DecimalType => Some(List((d.precision, d.scale)))
+ case _ => None
}
+ val dataTypeInfo = CarbonParserUtil.parseColumn(newColumn.name,
newColumn.dataType, values)
+ val alterTableColRenameAndDataTypeChangeModel =
+ AlterTableDataTypeChangeModel(
+ dataTypeInfo,
+ dbName,
+ tableName,
+ columnName,
+ newColumn.name.toLowerCase,
+ isColumnRename,
+ newColumnComment)
+
+ CarbonAlterTableColRenameDataTypeChangeCommand(
+ alterTableColRenameAndDataTypeChangeModel
+ )
}
def describeTable(
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 7b7749d..4201097 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.{CarbonSchemaException,
MalformedCarbonCommandException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -101,7 +102,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
initialize
}
- private def initializeGloabalSort = {
+ private def initializeGlobalSort = {
val initframe = generateData(10)
initframe.write
.format("carbondata")
@@ -259,7 +260,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
test("test basic merge into the globalsort table") {
sql("drop table if exists order")
- val (dwSelframe, odsframe) = initializeGloabalSort
+ val (dwSelframe, odsframe) = initializeGlobalSort
val updateMap = Map("id" -> "A.id",
"name" -> "B.name",
@@ -718,22 +719,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("check the cdc with partition") {
- sql("drop table if exists target")
-
- val initframe = sqlContext.sparkSession.createDataFrame(Seq(
- Row("a", "0"),
- Row("b", "1"),
- Row("c", "2"),
- Row("d", "3")
- ).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
-
- initframe.write
- .format("carbondata")
- .option("tableName", "target")
- .option("partitionColumns", "value")
- .mode(SaveMode.Overwrite)
- .save()
- val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ val target = prepareTarget(isPartitioned = true, "value")
var cdc =
sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "10", false, 0),
@@ -748,6 +734,9 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time",
IntegerType))))
cdc.createOrReplaceTempView("changes")
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false"
+ )
cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT " +
"key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
@@ -781,23 +770,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test upsert APIs on partition table") {
- sql("drop table if exists target")
- val initframe = sqlContext.sparkSession.createDataFrame(Seq(
- Row("a", 0, "CHINA"),
- Row("b", 1, "INDIA"),
- Row("c", 2, "INDIA"),
- Row("d", 3, "US")
- ).asJava,
- StructType(Seq(StructField("key", StringType),
- StructField("value", IntegerType),
- StructField("country", StringType))))
- initframe.write
- .format("carbondata")
- .option("tableName", "target")
- .option("partitionColumns", "country")
- .mode(SaveMode.Overwrite)
- .save()
- val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ val target = prepareTargetWithThreeFields(isPartitioned = true, "country")
var cdc =
sqlContext.sparkSession.createDataFrame(Seq(
Row("a", 7, "CHINA"),
@@ -847,20 +820,222 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
Row("j", 2, "RUSSIA"), Row("k", 0, "INDIA")))
}
- test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") {
+ def prepareTarget(
+ isPartitioned: Boolean = false,
+ partitionedColumn: String = null
+ ): Dataset[Row] = {
sql("drop table if exists target")
- val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+ val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
- initframe.write
- .format("carbondata")
- .option("tableName", "target")
- .mode(SaveMode.Overwrite)
- .save()
- val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+
+ if (isPartitioned) {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", partitionedColumn)
+ .mode(SaveMode.Overwrite)
+ .save()
+ } else {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ sqlContext.read.format("carbondata").option("tableName", "target").load()
+ }
+
+ def prepareTargetWithThreeFields(
+ isPartitioned: Boolean = false,
+ partitionedColumn: String = null
+ ): Dataset[Row] = {
+ sql("drop table if exists target")
+ val initFrame = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 0, "CHINA"),
+ Row("b", 1, "INDIA"),
+ Row("c", 2, "INDIA"),
+ Row("d", 3, "US")
+ ).asJava,
+ StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType),
+ StructField("country", StringType))))
+
+ if (isPartitioned) {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", partitionedColumn)
+ .mode(SaveMode.Overwrite)
+ .save()
+ } else {
+ initFrame.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ }
+ sqlContext.read.format("carbondata").option("tableName", "target").load()
+ }
+
+ test("test schema enforcement") {
+ val target = prepareTarget()
+ var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "1", "ab"),
+ Row("d", "4", "de")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", StringType)
+ , StructField("new_value", StringType))))
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "true"
+ )
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", "1"), Row("b", "1"), Row("c", "2"), Row("d", "4")))
+
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "true"
+ )
+
+ val exceptionCaught1 = intercept[MalformedCarbonCommandException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1, "ab"),
+ Row("d", 4, "de")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType)
+ , StructField("new_value", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+ assert(exceptionCaught1.getMessage
+ .contains(
+ "property CARBON_STREAMER_INSERT_DEDUPLICATE should " +
+ "only be set with operation type INSERT"))
+
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ val exceptionCaught2 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1),
+ Row("d", 4)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("val", IntegerType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+ assert(exceptionCaught2.getMessage.contains("source schema does not
contain field: value"))
+
+ val exceptionCaught3 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1L),
+ Row("d", 4L)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", LongType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+
+ assert(exceptionCaught3.getMessage.contains("source schema has different "
+
+ "data type for field: value"))
+
+ val exceptionCaught4 = intercept[CarbonSchemaException] {
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "1", "A"),
+ Row("d", "4", "D")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", StringType), StructField("Key", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ }
+
+ assert(exceptionCaught4.getMessage.contains("source schema has similar
fields which " +
+ "differ only in case
sensitivity: key"))
+ }
+
+ test("test schema evolution") {
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD, "value"
+ )
+ sql("drop table if exists target")
+ var target = prepareTargetWithThreeFields()
+ var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 1, "ab", "china"),
+ Row("d", 4, "de", "china"),
+ Row("d", 7, "updated_de", "china_pro")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType)
+ , StructField("new_value", StringType),
+ StructField("country", StringType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", 1, "china", "ab"), Row("b", 1, "INDIA", null),
+ Row("c", 2, "INDIA", null), Row("d", 7, "china_pro", "updated_de")))
+
+ target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", 5),
+ Row("d", 5)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", IntegerType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", 5), Row("b", 1),
+ Row("c", 2), Row("d", 5)))
+
+ target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("b", 50L),
+ Row("d", 50L)
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", LongType))))
+ target.as("A").upsert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", 5), Row("b", 50L),
+ Row("c", 2), Row("d", 50L)))
+ }
+
+ test("test deduplication with existing dataset") {
+ val target = prepareTarget()
+ var cdc = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "1", "ab"),
+ Row("e", "4", "de"),
+ Row("e", "6", "de1")
+ ).asJava, StructType(Seq(StructField("key", StringType),
+ StructField("value", StringType)
+ , StructField("new_value", StringType))))
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "true"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD, "value"
+ )
+ properties.addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "true"
+ )
+ target.as("A").insert(cdc.as("B"), "key").execute()
+ checkAnswer(sql("select * from target"),
+ Seq(Row("a", "0"), Row("b", "1"),
+ Row("c", "2"), Row("d", "3"), Row("e", "6")))
+ }
+
+ test("test all the merge APIs UPDATE, DELETE, UPSERT and INSERT") {
+ val target = prepareTarget()
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE, "false"
+ )
var cdc =
sqlContext.sparkSession.createDataFrame(Seq(
Row("a", "7"),
@@ -976,6 +1151,9 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT, "false"
+ )
initframe.repartition(1).write
.format("carbondata")
.option("tableName", "target")