akashrn5 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r670689968
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
##########
@@ -780,6 +780,68 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
Seq(Row("c", "200"), Row("e", "100")))
}
+ test("test new API") {
Review comment:
updated and added test cases
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeUtil.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{ExecutionErrors,
UpdateTableModel}
+import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+import org.apache.spark.sql.execution.command.mutation.DeleteExecution
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil,
SegmentUpdateDetails}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+
+
+object MergeUtil {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ def triggerAction(sparkSession: SparkSession,
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors,
UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,
+ frame: DataFrame,
+ targetCarbonTable: CarbonTable,
+ stats: Stats,
+ srcDS: DataFrame) {
+
+ protected def performTagging: (RDD[Row], String) = {
+ val tupleId = frame.queryExecution.analyzed.output.zipWithIndex
+
.find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+ val schema =
+ org.apache.spark.sql.types.StructType(Seq(
+ StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType)))
+ val job = CarbonSparkUtil.createHadoopJob()
+ job.setOutputKeyClass(classOf[Void])
+ job.setOutputValueClass(classOf[InternalRow])
+ val insertedRows = stats.insertedRows
+ val updatedRows = stats.updatedRows
+ val uuid = UUID.randomUUID.toString
+ job.setJobID(new JobID(uuid, 0))
+ val path = targetCarbonTable.getTablePath +
CarbonCommonConstants.FILE_SEPARATOR + "avro"
+ FileOutputFormat.setOutputPath(job, new Path(path))
+ val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job,
schema)
+ val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
job.getConfiguration)
+ frame.queryExecution.toRdd.mapPartitionsWithIndex { case (index, iterator)
=>
+ val confB = config.value.value
+ val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+ val attemptID = new TaskAttemptID(task, index)
+ val context = new TaskAttemptContextImpl(confB, attemptID)
+ val writer = factory.newInstance(path +
CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+ schema, context)
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ if (iterator.hasNext) {
+ true
+ } else {
+ writer.close()
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ val row = iterator.next()
+ val newArray = new Array[Any](1)
+ val tupleID = row.getUTF8String(tupleId)
+ if (tupleID == null) {
+ insertedRows.add(1)
+ } else {
+ newArray(0) = tupleID
+ writer.write(new GenericInternalRow(newArray))
+ updatedRows.add(1)
+ }
+ null
+ }
+ }
+ }.count()
+ val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, path)
+ (deltaRdd, path)
+ }
+
+ protected def triggerAction(factTimestamp: Long,
+ executorErrors: ExecutionErrors,
+ deltaRdd: RDD[Row],
+ deltaPath: String): (util.List[SegmentUpdateDetails], Seq[Segment]) = {
+ val tuple = MergeUtil.triggerAction(sparkSession,
+ targetCarbonTable,
+ factTimestamp,
+ executorErrors,
+ deltaRdd)
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+ MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
factTimestamp, tuple)
+ tuple
+ }
+
+ protected def insertDataToTargetTable(updateTableModel:
Option[UpdateTableModel]): Seq[Row] = {
+ val tableCols =
+ targetCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+ val header = tableCols.mkString(",")
+ val dataFrame = srcDS.select(tableCols.map(col): _*)
+ MergeUtil.insertDataToTargetTable(sparkSession,
+ targetCarbonTable,
+ header,
+ updateTableModel,
+ dataFrame)
+ }
+
+ protected def tryHorizontalCompaction(): Unit = {
+ // Do IUD Compaction.
+ HorizontalCompaction.tryHorizontalCompaction(
+ sparkSession, targetCarbonTable)
+ }
+
+ def handleMerge()
+}
+
+case class UpdateHandler(sparkSession: SparkSession,
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeHandler.scala
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.command.{ExecutionErrors,
UpdateTableModel}
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+/**
+ * This class handles the merge actions of UPSERT, UPDATE, DELETE, INSERT
+ */
+abstract class MergeHandler(sparkSession: SparkSession,
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+ /**
+ * This method reads the splits and make (blockPath, (min, max)) tuple to to
min max pruning of
+ * the src dataset
+ *
+ * @param colTosplitsFilePathAndMinMaxMap CarbonInputSplit whose min max
cached in driver or
+ * the index server
+ * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath
and min max of all the
+ * join columns involved
+ */
+ def addFilePathAndMinMaxTuples(
+ colTosplitsFilePathAndMinMaxMap: mutable.Map[String,
util.List[FilePathMinMaxVO]],
+ carbonTable: CarbonTable,
+ joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn,
SerializableComparator],
+ fileMinMaxMapListOfAllJoinColumns:
mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+ CarbonColumn)]): Unit = {
+ joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+ val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+ collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+ val joinDataType = joinColumn.getDataType
+ val isDimension = joinColumn.isDimension
+ val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType)
&&
+ (joinDataType != DataTypes.DATE)
+ colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+ filePathMinMiax =>
+ val filePath = filePathMinMiax.getFilePath
+ val minBytes = filePathMinMiax.getMin
+ val maxBytes = filePathMinMiax.getMax
+ val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+ // While data loading to SI created on Partition table, on
+ // partition directory, '/' will be
+ // replaced with '#', to support multi level partitioning. For
example, BlockId will be
+ // look like `part1=1#part2=2/xxxxxxxxx`. During query also,
blockId should be
+ // replaced by '#' in place of '/', to match and prune data on SI
table.
+ CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+ filePath,
+ "",
+ true,
+ false,
+ true)
+ } else {
+ filePath.substring(filePath.lastIndexOf("/Part") + 1)
+ }
+ if (isDimension) {
+ if (isPrimitiveAndNotDate) {
+ val minValue =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+ joinDataType)
+ val maxValue =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+ joinDataType)
+ // check here if present in map, if it is, compare and update
min and amx
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin =
+ comparator.compare(fileMinMaxMap(uniqBlockPath)._1,
minValue) > 0
+ val isMaxMoreThanMax =
+ comparator.compare(maxValue,
fileMinMaxMap(uniqBlockPath)._2) > 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ minValue,
+ maxValue,
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMax)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+ }
+ } else {
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+
.compareTo(fileMinMaxMap(uniqBlockPath)._1
+ .asInstanceOf[String].getBytes(),
minBytes) > 0
+ val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(maxBytes,
fileMinMaxMap(uniqBlockPath)._2
+ .asInstanceOf[String].getBytes()) > 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ new String(minBytes),
+ new String(maxBytes),
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMax)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new
String(maxBytes)))
+ }
+ }
+ } else {
+ val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes,
joinDataType)
+ val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes,
joinDataType)
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin =
+ comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue)
> 0
+ val isMaxMoreThanMin =
+ comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2)
> 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ minValue,
+ maxValue,
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMin)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+ }
+ }
+ }
+ fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+ }
+ }
+
+ /**
+ * This method updates the min max map of the block if the value is less
than min or more
+ * than max
+ */
+ private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap:
mutable.Map[String, (AnyRef, AnyRef)],
+ minValue: AnyRef,
+ maxValue: AnyRef,
+ uniqBlockPath: String,
+ isMinLessThanMin: Boolean,
+ isMaxMoreThanMin: Boolean): Unit = {
+ (isMinLessThanMin, isMaxMoreThanMin) match {
+ case (true, true) => fileMinMaxMap(uniqBlockPath) = (minValue, maxValue)
+ case (true, false) => fileMinMaxMap(uniqBlockPath) = (minValue,
+ fileMinMaxMap(uniqBlockPath)._2)
+ case (false, true) => fileMinMaxMap(uniqBlockPath) =
(fileMinMaxMap(uniqBlockPath)._1,
+ maxValue)
+ case _ =>
+ }
+ }
+
+ /**
+ * This method returns the partitions required to scan in the target table
based on the
+ * partitions present in the src table or dataset
+ */
+ def getPartitionSpecToConsiderForPruning(sparkSession: SparkSession,
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+ /**
+ * This method reads the splits and make (blockPath, (min, max)) tuple to to
min max pruning of
+ * the src dataset
+ *
+ * @param colTosplitsFilePathAndMinMaxMap CarbonInputSplit whose min max
cached in driver or
+ * the index server
+ * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath
and min max of all the
+ * join columns involved
+ */
+ def addFilePathAndMinMaxTuples(
+ colTosplitsFilePathAndMinMaxMap: mutable.Map[String,
util.List[FilePathMinMaxVO]],
+ carbonTable: CarbonTable,
+ joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn,
SerializableComparator],
+ fileMinMaxMapListOfAllJoinColumns:
mutable.ArrayBuffer[(mutable.Map[String, (AnyRef, AnyRef)],
+ CarbonColumn)]): Unit = {
+ joinColumnsToComparatorMap.foreach { case (joinColumn, comparator) =>
+ val fileMinMaxMap: mutable.Map[String, (AnyRef, AnyRef)] =
+ collection.mutable.Map.empty[String, (AnyRef, AnyRef)]
+ val joinDataType = joinColumn.getDataType
+ val isDimension = joinColumn.isDimension
+ val isPrimitiveAndNotDate = DataTypeUtil.isPrimitiveColumn(joinDataType)
&&
+ (joinDataType != DataTypes.DATE)
+ colTosplitsFilePathAndMinMaxMap(joinColumn.getColName).asScala.foreach {
+ filePathMinMiax =>
+ val filePath = filePathMinMiax.getFilePath
+ val minBytes = filePathMinMiax.getMin
+ val maxBytes = filePathMinMiax.getMax
+ val uniqBlockPath = if (carbonTable.isHivePartitionTable) {
+ // While data loading to SI created on Partition table, on
+ // partition directory, '/' will be
+ // replaced with '#', to support multi level partitioning. For
example, BlockId will be
+ // look like `part1=1#part2=2/xxxxxxxxx`. During query also,
blockId should be
+ // replaced by '#' in place of '/', to match and prune data on SI
table.
+ CarbonUtil.getBlockId(carbonTable.getAbsoluteTableIdentifier,
+ filePath,
+ "",
+ true,
+ false,
+ true)
+ } else {
+ filePath.substring(filePath.lastIndexOf("/Part") + 1)
+ }
+ if (isDimension) {
+ if (isPrimitiveAndNotDate) {
+ val minValue =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minBytes,
+ joinDataType)
+ val maxValue =
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxBytes,
+ joinDataType)
+ // check here if present in map, if it is, compare and update
min and amx
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin =
+ comparator.compare(fileMinMaxMap(uniqBlockPath)._1,
minValue) > 0
+ val isMaxMoreThanMax =
+ comparator.compare(maxValue,
fileMinMaxMap(uniqBlockPath)._2) > 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ minValue,
+ maxValue,
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMax)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+ }
+ } else {
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin = ByteUtil.UnsafeComparer.INSTANCE
+
.compareTo(fileMinMaxMap(uniqBlockPath)._1
+ .asInstanceOf[String].getBytes(),
minBytes) > 0
+ val isMaxMoreThanMax = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(maxBytes,
fileMinMaxMap(uniqBlockPath)._2
+ .asInstanceOf[String].getBytes()) > 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ new String(minBytes),
+ new String(maxBytes),
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMax)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (new String(minBytes), new
String(maxBytes)))
+ }
+ }
+ } else {
+ val maxValue = DataTypeUtil.getMeasureObjectFromDataType(maxBytes,
joinDataType)
+ val minValue = DataTypeUtil.getMeasureObjectFromDataType(minBytes,
joinDataType)
+ if (fileMinMaxMap.contains(uniqBlockPath)) {
+ val isMinLessThanMin =
+ comparator.compare(fileMinMaxMap(uniqBlockPath)._1, minValue)
> 0
+ val isMaxMoreThanMin =
+ comparator.compare(maxValue, fileMinMaxMap(uniqBlockPath)._2)
> 0
+ updateMapIfRequiredBasedOnMinMax(fileMinMaxMap,
+ minValue,
+ maxValue,
+ uniqBlockPath,
+ isMinLessThanMin,
+ isMaxMoreThanMin)
+ } else {
+ fileMinMaxMap += (uniqBlockPath -> (minValue, maxValue))
+ }
+ }
+ }
+ fileMinMaxMapListOfAllJoinColumns += ((fileMinMaxMap, joinColumn))
+ }
+ }
+
+ /**
+ * This method updates the min max map of the block if the value is less
than min or more
+ * than max
+ */
+ private def updateMapIfRequiredBasedOnMinMax(fileMinMaxMap:
mutable.Map[String, (AnyRef, AnyRef)],
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.mutate.FilePathMinMaxVO
+import org.apache.carbondata.core.util.{ByteUtil, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.comparator.SerializableComparator
+
+/**
+ * The utility class for Merge operations
+ */
+object CarbonMergeDataSetUtil {
+
+ /**
+ * This method reads the splits and make (blockPath, (min, max)) tuple to to
min max pruning of
+ * the src dataset
+ *
+ * @param colTosplitsFilePathAndMinMaxMap CarbonInputSplit whose min max
cached in driver or
+ * the index server
+ * @param fileMinMaxMapListOfAllJoinColumns collection to hold the filepath
and min max of all the
+ * join columns involved
+ */
+ def addFilePathAndMinMaxTuples(
+ colTosplitsFilePathAndMinMaxMap: mutable.Map[String,
util.List[FilePathMinMaxVO]],
+ carbonTable: CarbonTable,
+ joinColumnsToComparatorMap: mutable.LinkedHashMap[CarbonColumn,
SerializableComparator],
Review comment:
updated
##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
##########
@@ -81,6 +82,7 @@ class CarbonEnv {
sparkSession.udf.register("getTupleId", () => "")
sparkSession.udf.register("getPositionId", () => "")
+ sparkSession.udf.register("block_paths", new BlockPathsUDF)
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
##########
@@ -89,7 +89,7 @@ class DistributedIndexJob extends AbstractIndexJob {
}
client.getSplits(indexFormat)
.getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath,
indexFormat
- .getQueryId, indexFormat.isCountStarJob)
+ .getQueryId, indexFormat.isCountStarJob, null)
Review comment:
refactored
##########
File path:
integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
##########
@@ -427,6 +427,6 @@ object DistributedRDDUtils {
new java.util.ArrayList(),
new java.util.ArrayList())
new ExtendedBlockletWrapper(blocklets,
request.getCarbonTable.getTablePath, request.getQueryId,
- request.isWriteToFile, request.isCountStarJob)
+ request.isWriteToFile, request.isCountStarJob, request.getCdcVO)
Review comment:
refactored
##########
File path:
examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {
+
+ def main(args: Array[String]): Unit = {
+ val spark = ExampleUtils.createSparkSession("DataUPSERTExample")
+ performUPSERT(spark)
+ }
+
+ def performUPSERT(spark: SparkSession): Unit = {
+ spark.sql("drop table if exists target")
+ val initframe = spark.createDataFrame(Seq(
+ Row("a", "0"),
+ Row("b", "1"),
+ Row("c", "2"),
+ Row("d", "3")
+ ).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ val target = spark.read.format("carbondata").option("tableName",
"target").load()
+ var cdc =
+ spark.createDataFrame(Seq(
+ Row("a", "7"),
+ Row("b", null),
+ Row("g", null),
+ Row("e", "3")
+ ).asJava,
Review comment:
the API takes the java list
##########
File path:
examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.CarbonSession.DataSetMerge
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * Example for UPSERT APIs
+ */
+object DataUPSERTExample {
Review comment:
done
##########
File path:
core/src/main/java/org/apache/carbondata/core/range/BlockMinMaxTree.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.range;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+import static
org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET;
+
+/**
+ * This class prepares a tree for pruning using min-max of block
+ */
+public class BlockMinMaxTree implements Serializable {
+
+ private MinMaxNode root;
+
+ private final boolean isPrimitiveAndNotDate;
+ private final boolean isDimensionColumn;
+ private final DataType joinDataType;
+ private final SerializableComparator comparator;
+
+ public BlockMinMaxTree(boolean isPrimitiveAndNotDate, boolean
isDimensionColumn,
+ DataType joinDataType, SerializableComparator comparator) {
+ this.isPrimitiveAndNotDate = isPrimitiveAndNotDate;
+ this.isDimensionColumn = isDimensionColumn;
+ this.joinDataType = joinDataType;
+ this.comparator = comparator;
+ }
+
+ public MinMaxNode getRoot() {
+ return root;
+ }
+
+ public void insert(MinMaxNode newMinMaxNode) {
+ root = insert(getRoot(), newMinMaxNode);
+ }
+
+ MinMaxNode insert(MinMaxNode root, MinMaxNode newMinMaxNode) {
Review comment:
changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]