This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c20660f [CARBONDATA-3597] Missed commit from PR-3483 (SCD2)
c20660f is described below
commit c20660fa7ceed4ebe7a7fa465d4909474079eab9
Author: ravipesala <[email protected]>
AuthorDate: Sat Jan 4 00:26:50 2020 +0800
[CARBONDATA-3597] Missed commit from PR-3483 (SCD2)
There is a commit missed during PR merge of #3483
Why is this PR needed?
SCD can use insert flow to write data insted of calling the writer
directly
What changes were proposed in this PR?
using insert flow for SCD to write data
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes.
This closes #3704
---
.../impl/DictionaryBasedResultCollector.java | 3 +-
.../statusmanager/SegmentUpdateStatusManager.java | 2 +
.../carbondata/core/util/path/HDFSLeaseUtils.java | 214 -----------------
.../spark/rdd/CarbonDataRDDFactory.scala | 23 +-
.../spark/rdd/CarbonDeltaRowScanRDD.scala | 4 +-
.../command/carbonTableSchemaCommon.scala | 3 +-
.../management/CarbonInsertIntoWithDf.scala | 8 +-
.../command/management/CommonLoadUtils.scala | 19 +-
.../mutation/merge/CarbonMergeDataSetCommand.scala | 253 +++++++++++----------
.../mutation/merge/HistoryTableLoadHelper.scala | 24 +-
.../command/mutation/merge/MergeProjection.scala | 40 +---
.../spark/testsuite/merge/MergeTestCase.scala | 112 +++++++++
.../store/writer/AbstractFactDataWriter.java | 1 +
13 files changed, 315 insertions(+), 391 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index e6d0528..fb88028 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -135,7 +135,8 @@ public class DictionaryBasedResultCollector extends
AbstractScannedResultCollect
while (scannedResult.hasNext() && rowCounter < batchSize) {
scannedResult.incrementCounter();
if (readOnlyDelta) {
- if
(!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+ if (!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())
&&
+ scannedResult.getCurrentDeleteDeltaVo() != null) {
continue;
}
} else {
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6327781..f62382d 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -127,9 +127,11 @@ public class SegmentUpdateStatusManager {
HashSet<String> set = new HashSet<>();
set.add(updateVersion);
updateDetail.setDeltaFileStamps(set);
+ updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
newupdateDetails.add(updateDetail);
}
} else if
(updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) {
+ updateDetail.setSegmentStatus(SegmentStatus.SUCCESS);
newupdateDetails.add(updateDetail);
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
deleted file mode 100644
index 3058685..0000000
---
a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.util.path;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.viewfs.ViewFileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
-import org.apache.log4j.Logger;
-
-/**
- * Implementation for HDFS utility methods
- */
-public class HDFSLeaseUtils {
-
- /**
- * LOGGER
- */
- private static final Logger LOGGER =
- LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName());
-
- /**
- * This method will validate whether the exception thrown if for lease
recovery from HDFS
- *
- * @param message
- * @return
- */
- public static boolean checkExceptionMessageForLeaseRecovery(String message) {
- // depending on the scenario few more cases can be added for validating
lease recovery exception
- if (null != message && message.contains("Failed to APPEND_FILE")) {
- return true;
- }
- return false;
- }
-
- /**
- * This method will make attempts to recover lease on a file using the
- * distributed file system utility.
- *
- * @param filePath
- * @return
- * @throws IOException
- */
- public static boolean recoverFileLease(String filePath) throws IOException {
- LOGGER.info("Trying to recover lease on file: " + filePath);
- FileFactory.FileType fileType = FileFactory.getFileType(filePath);
- switch (fileType) {
- case ALLUXIO:
- case HDFS:
- case S3:
- Path path = FileFactory.getPath(filePath);
- FileSystem fs = FileFactory.getFileSystem(path);
- return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
- case VIEWFS:
- path = FileFactory.getPath(filePath);
- fs = FileFactory.getFileSystem(path);
- ViewFileSystem viewFileSystem = (ViewFileSystem) fs;
- Path targetFileSystemPath = viewFileSystem.resolvePath(path);
- FileSystem targetFileSystem =
FileFactory.getFileSystem(targetFileSystemPath);
- if (targetFileSystem instanceof DistributedFileSystem) {
- return recoverLeaseOnFile(filePath, path, (DistributedFileSystem)
targetFileSystem);
- } else {
- LOGGER.error(
- "Invalid file type. Lease recovery is not supported on
filesystem with file: "
- + filePath);
- return false;
- }
- default:
- LOGGER.error("Invalid file type. Lease recovery is not supported on
filesystem with file: "
- + filePath);
- return false;
- }
- }
-
- /**
- * Recovers lease on a file
- *
- * @param filePath
- * @param path
- * @param fs
- * @return
- * @throws IOException
- */
- private static boolean recoverLeaseOnFile(String filePath, Path path,
DistributedFileSystem fs)
- throws IOException {
- int maxAttempts = getLeaseRecoveryRetryCount();
- int retryInterval = getLeaseRecoveryRetryInterval();
- boolean leaseRecovered = false;
- IOException ioException = null;
- for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
- try {
- leaseRecovered = fs.recoverLease(path);
- if (!leaseRecovered) {
- try {
- LOGGER.info(
- "Failed to recover lease after attempt " + retryCount + " .
Will try again after "
- + retryInterval + " ms...");
- Thread.sleep(retryInterval);
- } catch (InterruptedException e) {
- LOGGER.error(
- "Interrupted exception occurred while recovering lease for
file : " + filePath, e);
- }
- }
- } catch (IOException e) {
- if (e instanceof LeaseExpiredException &&
e.getMessage().contains("File does not exist")) {
- LOGGER.error("The given file does not exist at path " + filePath);
- throw e;
- } else if (e instanceof FileNotFoundException) {
- LOGGER.error("The given file does not exist at path " + filePath);
- throw e;
- } else {
- LOGGER.error("Recover lease threw exception : " + e.getMessage(), e);
- ioException = e;
- }
- }
- LOGGER.info("Retrying again after interval of " + retryInterval + "
ms...");
- }
- if (leaseRecovered) {
- LOGGER.info("Successfully able to recover lease on file: " + filePath);
- return true;
- } else {
- LOGGER.error(
- "Failed to recover lease on file: " + filePath + " after retrying
for " + maxAttempts
- + " at an interval of " + retryInterval);
- if (null != ioException) {
- throw ioException;
- } else {
- return false;
- }
- }
- }
-
- private static int getLeaseRecoveryRetryCount() {
- String retryMaxAttempts = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
- int retryCount = 0;
- try {
- retryCount = Integer.parseInt(retryMaxAttempts);
- if (retryCount <
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN
- || retryCount >
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) {
- retryCount = Integer.parseInt(
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
- LOGGER.warn(
- String.format("value configured for %s is not in allowed range.
Allowed range " +
- "is >= %d and <= %d. Therefore considering default value:
%d",
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN,
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX,
- retryCount
- ));
- }
- } catch (NumberFormatException ne) {
- retryCount = Integer.parseInt(
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
- LOGGER.warn("value configured for " +
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
- + " is incorrect. Therefore considering default value: " +
retryCount);
- }
- return retryCount;
- }
-
- private static int getLeaseRecoveryRetryInterval() {
- String retryMaxAttempts = CarbonProperties.getInstance()
-
.getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL,
-
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
- int retryCount = 0;
- try {
- retryCount = Integer.parseInt(retryMaxAttempts);
- if (retryCount <
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN
- || retryCount >
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) {
- retryCount = Integer.parseInt(
-
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
- LOGGER.warn(
- "value configured for " +
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
- + " is not in allowed range. Allowed range is >="
- +
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <="
- +
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX
- + ". Therefore considering default value (ms): " + retryCount);
- }
- } catch (NumberFormatException ne) {
- retryCount = Integer.parseInt(
- CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
- LOGGER.warn(
- "value configured for " +
CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
- + " is incorrect. Therefore considering default value (ms): " +
retryCount);
- }
- return retryCount;
- }
-
-}
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8709d11..d8f67d1 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -325,7 +325,8 @@ object CarbonDataRDDFactory {
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
// create new segment folder in carbon store
- if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) {
+ if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable ||
+ updateModel.isDefined && updateModel.get.loadAsNewSegment) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId,
carbonTable)
}
var loadStatus = SegmentStatus.SUCCESS
@@ -339,7 +340,7 @@ object CarbonDataRDDFactory {
try {
if (!carbonLoadModel.isCarbonTransactionalTable ||
segmentLock.lockWithRetries()) {
- if (updateModel.isDefined) {
+ if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
res = loadDataFrameForUpdate(
sqlContext,
dataFrame,
@@ -484,7 +485,7 @@ object CarbonDataRDDFactory {
segmentLock.unlock()
}
// handle the status file updation for the update cmd.
- if (updateModel.isDefined) {
+ if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get,
executorMessage)
return null
@@ -615,6 +616,7 @@ object CarbonDataRDDFactory {
newEntryLoadStatus,
overwriteTable,
segmentFileName,
+ updateModel,
uniqueTableStatusId)
val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
new LoadTablePostStatusUpdateEvent(carbonLoadModel)
@@ -990,6 +992,7 @@ object CarbonDataRDDFactory {
newEntryLoadStatus: SegmentStatus,
overwriteTable: Boolean,
segmentFileName: String,
+ updateModel: Option[UpdateTableModel],
uuid: String = ""): (Boolean, LoadMetadataDetails) = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status.size > 0 && status(0)
!= null) {
@@ -1009,7 +1012,19 @@ object CarbonDataRDDFactory {
if (!carbonLoadModel.isCarbonTransactionalTable && overwriteTable) {
CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(carbonLoadModel)
}
- val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails,
carbonLoadModel, false,
+ var done = true
+ // If the updated data should be added as new segment then update the
segment information
+ if (updateModel.isDefined && updateModel.get.loadAsNewSegment) {
+ done = done && CarbonUpdateUtil.updateTableMetadataStatus(
+ carbonLoadModel.getLoadMetadataDetails.asScala.map(l =>
+ new Segment(l.getMergedLoadName,
+ l.getSegmentFile)).toSet.asJava,
+ carbonTable,
+ carbonLoadModel.getFactTimeStamp.toString,
+ true,
+ updateModel.get.deletedSegments.asJava)
+ }
+ done = done && CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails,
carbonLoadModel, false,
overwriteTable, uuid)
if (!done) {
val errorMessage = s"Dataload failed due to failure in table status
updation for" +
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
index 53b0a5a..d1e341f 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -80,7 +80,9 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
}.asJava
new CarbonSparkPartition(partition.rddId, partition.index,
new CarbonMultiBlockSplit(splits,
partition.multiBlockSplit.getLocations))
- }.filter(p => p.multiBlockSplit.getAllSplits.size() >
0).asInstanceOf[Array[Partition]]
+ }.filter(p => p.multiBlockSplit.getAllSplits.size() > 0).zipWithIndex.map{
case (p, index) =>
+ new CarbonSparkPartition(p.rddId, index, p.multiBlockSplit)
+ }.asInstanceOf[Array[Partition]]
}
override def createInputFormat(conf: Configuration):
CarbonTableInputFormat[Object] = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 63044ab..f52f566 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -120,7 +120,8 @@ case class UpdateTableModel(
isUpdate: Boolean,
updatedTimeStamp: Long,
var executorErrors: ExecutionErrors,
- deletedSegments: Seq[Segment])
+ deletedSegments: Seq[Segment],
+ loadAsNewSegment: Boolean = false)
case class CompactionModel(compactionSize: Long,
compactionType: CompactionType,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
index 5255b26..820971e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
@@ -110,7 +110,11 @@ case class CarbonInsertIntoWithDf(databaseNameOp:
Option[String],
// Clean up the old invalid segment data before creating a new entry for
new load.
SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions)
// add the start entry for the new load in the table status file
- if (updateModel.isEmpty && !table.isHivePartitionTable) {
+ if ((updateModel.isEmpty || updateModel.isDefined &&
updateModel.get.loadAsNewSegment)
+ && !table.isHivePartitionTable) {
+ if (updateModel.isDefined ) {
+ carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
+ }
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
carbonLoadModel,
isOverwriteTable)
@@ -182,7 +186,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp:
Option[String],
def insertData(loadParams: CarbonLoadParams): (Seq[Row],
LoadMetadataDetails) = {
var rows = Seq.empty[Row]
- val loadDataFrame = if (updateModel.isDefined) {
+ val loadDataFrame = if (updateModel.isDefined &&
!updateModel.get.loadAsNewSegment) {
Some(CommonLoadUtils.getDataFrameWithTupleID(Some(dataFrame)))
} else {
Some(dataFrame)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index a7cc48a..8a86388 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -879,16 +879,17 @@ object CommonLoadUtils {
try {
val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
loadParams.scanResultRDD.isDefined) {
- val (rdd, dfAttributes) = if (loadParams.updateModel.isDefined) {
- // Get the updated query plan in case of update scenario
- val updatedFrame = Dataset.ofRows(
- loadParams.sparkSession,
- getLogicalQueryForUpdate(
+ val (rdd, dfAttributes) =
+ if (loadParams.updateModel.isDefined &&
!loadParams.updateModel.get.loadAsNewSegment) {
+ // Get the updated query plan in case of update scenario
+ val updatedFrame = Dataset.ofRows(
loadParams.sparkSession,
- catalogTable,
- loadParams.dataFrame.get,
- loadParams.carbonLoadModel))
- (updatedFrame.rdd, updatedFrame.schema)
+ getLogicalQueryForUpdate(
+ loadParams.sparkSession,
+ catalogTable,
+ loadParams.dataFrame.get,
+ loadParams.carbonLoadModel))
+ (updatedFrame.rdd, updatedFrame.schema)
} else {
if (loadParams.finalPartition.nonEmpty) {
val headers = loadParams.carbonLoadModel
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index b16d3c3..4e6956b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -16,38 +16,40 @@
*/
package org.apache.spark.sql.execution.command.mutation.merge
+import java.util
import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID,
TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.sql.{AnalysisException,
CarbonDatasourceHadoopRelation, CarbonUtils, Column, DataFrame, Dataset, Row,
SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericRowWithSchema}
+import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors}
+import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors,
UpdateTableModel}
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata,
LongAccumulator}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
-import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel,
CarbonLoadModelBuilder}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
* This command will merge the data of source dataset to target dataset backed
by carbon table.
@@ -63,6 +65,8 @@ case class CarbonMergeDataSetCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ private val status_on_mergeds = "status_on_mergeds"
+
/**
* It merge the data of source dataset to target dataset backed by carbon
table. Basically it
* makes the full outer join with both source and target and apply the given
conditions as "case
@@ -104,31 +108,35 @@ case class CarbonMergeDataSetCommand(
.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
expr("getTupleId()"))
.withColumn("exist_on_target", lit(1))
.join(srcDS.withColumn("exist_on_src", lit(1)), mergeMatches.joinExpr,
joinType)
- .withColumn("status", condition)
+ .withColumn(status_on_mergeds, condition)
if (LOGGER.isDebugEnabled) {
frame.explain()
}
val tableCols =
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
- val builder = new CarbonLoadModelBuilder(carbonTable)
- val options = Seq(("fileheader", tableCols.mkString(","))).toMap
- val model = builder.build(options.asJava,
CarbonUpdateUtil.readCurrentTime, "1")
- model.setLoadWithoutConverterStep(true)
- val newLoadMetaEntry = new LoadMetadataDetails
- CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
- SegmentStatus.INSERT_IN_PROGRESS,
- model.getFactTimeStamp,
- false)
- CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true,
false)
-
- model.setCsvHeader(tableCols.mkString(","))
+ val header = tableCols.mkString(",")
val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map {
m =>
m.getActions.map {
- case u: UpdateAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, u)
- case i: InsertAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, i)
- case d: DeleteAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, d)
+ case u: UpdateAction => MergeProjection(tableCols,
+ status_on_mergeds,
+ frame,
+ rltn.head,
+ sparkSession,
+ u)
+ case i: InsertAction => MergeProjection(tableCols,
+ status_on_mergeds,
+ frame,
+ rltn.head,
+ sparkSession,
+ i)
+ case d: DeleteAction => MergeProjection(tableCols,
+ status_on_mergeds,
+ frame,
+ rltn.head,
+ sparkSession,
+ d)
case _ => null
}.filter(_ != null)
}
@@ -138,69 +146,64 @@ case class CarbonMergeDataSetCommand(
val stats = Stats(createLongAccumalator("insertedRows"),
createLongAccumalator("updatedRows"),
createLongAccumalator("deletedRows"))
- val processedRDD = processIUD(sparkSession, frame, carbonTable, model,
projections, stats)
+ val targetSchema = StructType(tableCols.map { f =>
+ rltn.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
+ } ++ Seq(StructField(status_on_mergeds, IntegerType)))
+ val (processedRDD, deltaPath) = processIUD(sparkSession, frame,
carbonTable, projections,
+ targetSchema, stats)
val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
- val trxMgr = TranxManager(model.getFactTimeStamp)
+ val trxMgr = TranxManager(System.currentTimeMillis())
val mutationAction = MutationActionFactory.getMutationAction(sparkSession,
carbonTable, hasDelAction, hasUpdateAction,
insertHistOfUpdate, insertHistOfDelete)
- val tuple = mutationAction.handleAction(processedRDD, executorErrors,
trxMgr)
-
- // In case user only has insert action.
- if (!(hasDelAction || hasUpdateAction)) {
- processedRDD.count()
+ val loadDF = Dataset.ofRows(sparkSession,
+ LogicalRDD(targetSchema.toAttributes,
+ processedRDD)(sparkSession))
+
+ loadDF.cache()
+ loadDF.count()
+ val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
+ val deltaRdd = sparkSession.read.format("carbon").load(deltaPath).rdd
+ val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
+ if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
+ carbonTable,
+ trxMgr.getLatestTrx.toString, false)) {
+ LOGGER.error("writing of update status file failed")
+ throw new CarbonMergeDataSetException("writing of update status file
failed")
+ }
+ Some(UpdateTableModel(true, trxMgr.getLatestTrx,
+ executorErrors, tuple._2, true))
+ } else {
+ None
}
+ CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
+ .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "false")
+
+ CarbonInsertIntoWithDf(
+ databaseNameOp = Some(carbonTable.getDatabaseName),
+ tableName = carbonTable.getTableName,
+ options = Map(("fileheader" -> header)),
+ isOverwriteTable = false,
+ dataFrame = loadDF.select(tableCols.map(col): _*),
+ updateModel = updateTableModel,
+ tableInfoOp = Some(carbonTable.getTableInfo)).process(sparkSession)
+
LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
LOGGER.info(
- " Time taken to merge data : " + tuple + " :: " +
(System.currentTimeMillis() - st))
-
- val segment = new Segment(model.getSegmentId,
- SegmentFileStore.genSegmentFileName(
- model.getSegmentId,
- System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
- model.getSegmentId), Map.empty[String, String].asJava)
- val writeSegment =
- SegmentFileStore.writeSegmentFile(carbonTable, segment)
-
- if (writeSegment) {
- SegmentFileStore.updateTableStatusFile(
- carbonTable,
- model.getSegmentId,
- segment.getSegmentFileName,
- carbonTable.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(carbonTable.getTablePath,
segment.getSegmentFileName),
- SegmentStatus.SUCCESS)
- } else {
- CarbonLoaderUtil.updateTableStatusForFailure(model)
- }
+ " Time taken to merge data :: " + (System.currentTimeMillis() - st))
- if (hasDelAction || hasUpdateAction) {
- if (CarbonUpdateUtil.updateSegmentStatus(tuple._1, carbonTable,
- trxMgr.getLatestTrx.toString, false) &&
- CarbonUpdateUtil
- .updateTableMetadataStatus(
- model.getLoadMetadataDetails.asScala.map(l =>
- new Segment(l.getMergedLoadName,
- l.getSegmentFile)).toSet.asJava,
- carbonTable,
- trxMgr.getLatestTrx.toString,
- true,
- tuple._2.asJava)) {
- LOGGER.info(s"Merge data operation is successful for " +
- s"${ carbonTable.getDatabaseName }.${
carbonTable.getTableName }")
- } else {
- throw new CarbonMergeDataSetException("Saving update status or table
status failed")
- }
- }
// Load the history table if the inserthistorytable action is added by
user.
HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head,
carbonTable,
trxMgr, mutationAction, mergeMatches)
+ // Do IUD Compaction.
+ HorizontalCompaction.tryHorizontalCompaction(
+ sparkSession, carbonTable, isUpdateOperation = false)
Seq.empty
}
@@ -240,11 +243,10 @@ case class CarbonMergeDataSetCommand(
private def processIUD(sparkSession: SparkSession,
frame: DataFrame,
carbonTable: CarbonTable,
- model: CarbonLoadModel,
projections: Seq[Seq[MergeProjection]],
+ targetSchema: StructType,
stats: Stats) = {
val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
- val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
conf)
val frameCols = frame.queryExecution.analyzed.output
val status = frameCols.length - 1
val tupleId = frameCols.zipWithIndex
@@ -252,34 +254,40 @@ case class CarbonMergeDataSetCommand(
val insertedRows = stats.insertedRows
val updatedRows = stats.updatedRows
val deletedRows = stats.deletedRows
-
frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
+ val job = Job.getInstance(conf)
+ job.setOutputKeyClass(classOf[Void])
+ job.setOutputValueClass(classOf[InternalRow])
+ val uuid = UUID.randomUUID.toString
+ job.setJobID(new JobID(uuid, 0))
+ val path = carbonTable.getTablePath + "/" + job.getJobID
+ FileOutputFormat.setOutputPath(job, new Path(path))
+ val schema =
+ org.apache.spark.sql.types.StructType(Seq(
+ StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType),
+ StructField(status_on_mergeds, IntegerType)))
+ val factory =
+ new SparkCarbonFileFormat().prepareWrite(sparkSession, job,
+
carbonTable.getTableInfo.getFactTable.getTableProperties.asScala.toMap, schema)
+ val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
job.getConfiguration)
+
(frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
mapPartitionsWithIndex { case (index, iter) =>
+ CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
+ .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
val confB = config.value.value
- CarbonTableOutputFormat.setCarbonTable(confB, carbonTable)
- model.setTaskNo(index.toString)
- CarbonTableOutputFormat.setLoadModel(confB, model)
- val jobId = new JobID(UUID.randomUUID.toString, 0)
- val task = new TaskID(jobId, TaskType.MAP, index)
+ val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
val attemptID = new TaskAttemptID(task, index)
val context = new TaskAttemptContextImpl(confB, attemptID)
- val writer = new CarbonTableOutputFormat().getRecordWriter(context)
- val writable = new ObjectArrayWritable
+ val writer = factory.newInstance(path, schema, context)
val projLen = projections.length
- val schema =
- org.apache.spark.sql.types.StructType(Seq(
- StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType),
- StructField("status", IntegerType)))
- new Iterator[Row] {
- override def hasNext: Boolean = {
- if (iter.hasNext) {
- true
- } else {
- writer.close(context)
- false
- }
+ new Iterator[InternalRow] {
+ val queue = new util.LinkedList[InternalRow]()
+ override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext)
true else {
+ writer.close()
+ false
}
- override def next(): Row = {
+ override def next(): InternalRow = {
+ if (!queue.isEmpty) return queue.poll()
val row = iter.next()
val rowWithSchema = row.asInstanceOf[GenericRowWithSchema]
val is = row.get(status)
@@ -288,29 +296,23 @@ case class CarbonMergeDataSetCommand(
var insertedCount = 0
if (is != null) {
val isInt = is.asInstanceOf[Int]
- var i = 0;
+ var i = 0
while (i < projLen) {
- if ((isInt & (1 << i)) == (1 << i)) {
- projections(i).foreach { p =>
- if (!p.isDelete) {
- if (p.isUpdate) {
- isUpdate = p.isUpdate
- }
- writable.set(p(rowWithSchema))
- writer.write(NullWritable.get(), writable)
- insertedCount += 1
- } else {
- isDelete = true
- }
- }
+ if ((isInt & (1 << i)) == (1 << i)) projections(i).foreach { p
=>
+ if (!p.isDelete) {
+ if (p.isUpdate) isUpdate = p.isUpdate
+ queue.add(p(rowWithSchema))
+ insertedCount += 1
+ } else isDelete = true
}
i = i + 1
}
}
val newArray = new Array[Any](2)
- newArray(0) = row.getString(tupleId)
+ newArray(0) = UTF8String.fromString(row.getString(tupleId))
if (isUpdate && isDelete) {
newArray(1) = 102
+ writer.write(new GenericInternalRow(newArray))
updatedRows.add(1)
deletedRows.add(1)
insertedCount -= 1
@@ -318,17 +320,23 @@ case class CarbonMergeDataSetCommand(
updatedRows.add(1)
newArray(1) = 101
insertedCount -= 1
+ writer.write(new GenericInternalRow(newArray))
} else if (isDelete) {
newArray(1) = 100
deletedRows.add(1)
- } else {
- newArray(1) = is
+ writer.write(new GenericInternalRow(newArray))
}
insertedRows.add(insertedCount)
- new GenericRowWithSchema(newArray, schema)
+ if (!queue.isEmpty) queue.poll() else {
+ val values = new Array[Any](targetSchema.length)
+ new GenericInternalRow(values)
+ }
}
}
- }.cache()
+ }.filter { row =>
+ val status = row.get(targetSchema.length-1, IntegerType)
+ status != null
+ }, path)
}
private def createLongAccumalator(name: String) = {
@@ -510,6 +518,13 @@ case class CarbonMergeDataSetCommand(
}.filter(_ != null)
}
+ private def collectCarbonRelation(plan: LogicalPlan):
Seq[CarbonDatasourceHadoopRelation] = {
+ plan collect {
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ }
+ }
+
private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = {
val insertHistOfUpdate = mergeMatches.matchList.exists(p =>
p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
index c7f6e6c..ffd6ca1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
@@ -20,13 +20,16 @@ import scala.collection.JavaConverters._
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row,
SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.DataTypeConverterImpl
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.spark.rdd.CarbonDeltaRowScanRDD
-import org.apache.carbondata.spark.readsupport.SparkGenericRowReadSupportImpl
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
object HistoryTableLoadHelper {
@@ -93,7 +96,7 @@ object HistoryTableLoadHelper {
unionDf.createOrReplaceTempView(alias)
val start = System.currentTimeMillis()
sparkSession.sql(s"insert into ${ insert.historyTable.quotedString } " +
- s"select * from ${ alias }").collect()
+ s"select * from ${ alias }")
LOGGER.info("Time taken to insert into history table " +
(System.currentTimeMillis() - start))
}
}
@@ -107,20 +110,23 @@ object HistoryTableLoadHelper {
insertHist: InsertInHistoryTableAction,
histDataFrame: Dataset[Row],
factTimestamp: Long) = {
- val rdd1 = new CarbonDeltaRowScanRDD[Row](sparkSession,
+ val rdd1 = new CarbonDeltaRowScanRDD[InternalRow](sparkSession,
carbonTable.getTableInfo.serialize(),
carbonTable.getTableInfo,
- null,
+ CarbonFilters.getPartitions(
+ Seq.empty,
+ sparkSession,
+ TableIdentifier(carbonTable.getTableName,
Some(carbonTable.getDatabaseName))).orNull,
new CarbonProjection(
carbonTable.getCreateOrderColumn().asScala.map(_.getColName).toArray),
null,
carbonTable.getAbsoluteTableIdentifier,
new CarbonInputMetrics,
- classOf[DataTypeConverterImpl],
- classOf[SparkGenericRowReadSupportImpl],
+ classOf[SparkDataTypeConverterImpl],
+ classOf[SparkRowReadSupportImpl],
factTimestamp.toString)
-
- val frame1 = sparkSession.createDataFrame(rdd1, rltn.carbonRelation.schema)
+ val frame1 = Dataset.ofRows(sparkSession,
+ LogicalRDD(rltn.carbonRelation.output, rdd1)(sparkSession))
val histOutput = histDataFrame.queryExecution.analyzed.output
val cols = histOutput.map { a =>
insertHist.insertMap.find(p => p._1.toString().equalsIgnoreCase(a.name))
match {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
index 1245bd4..f0c6945 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
@@ -19,17 +19,16 @@ package
org.apache.spark.sql.execution.command.mutation.merge
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row,
SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, GenericRowWithSchema, InterpretedMutableProjection,
Projection}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types.{DateType, TimestampType}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
/**
* Creates the projection for each action like update,delete or insert.
*/
case class MergeProjection(
@transient tableCols: Seq[String],
+ @transient statusCol : String,
@transient ds: Dataset[Row],
@transient rltn: CarbonDatasourceHadoopRelation,
@transient sparkSession: SparkSession,
@@ -40,9 +39,9 @@ case class MergeProjection(
val isUpdate = mergeAction.isInstanceOf[UpdateAction]
val isDelete = mergeAction.isInstanceOf[DeleteAction]
- def apply(row: GenericRowWithSchema): Array[Object] = {
+ def apply(row: GenericRowWithSchema): InternalRow = {
// TODO we can avoid these multiple conversions if this is added as a
SparkPlan node.
- val values = row.toSeq.map {
+ val values = row.values.map {
case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
case d: java.math.BigDecimal =>
org.apache.spark.sql.types.Decimal.apply(d)
case b: Array[Byte] =>
org.apache.spark.unsafe.types.UTF8String.fromBytes(b)
@@ -51,31 +50,7 @@ case class MergeProjection(
case value => value
}
- val outputRow = projection(new GenericInternalRow(values.toArray))
- .asInstanceOf[GenericInternalRow]
-
- val array = outputRow.values.clone()
- var i = 0
- while (i < array.length) {
- output(i).dataType match {
- case d: DateType =>
- if (array(i) == null) {
- array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
- } else {
- array(i) = (array(i).asInstanceOf[Int] + cutOffDate)
- }
- case d: TimestampType =>
- if (array(i) == null) {
- array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
- } else {
- array(i) = (array(i).asInstanceOf[Long] / 1000)
- }
-
- case _ =>
- }
- i += 1
- }
- array.asInstanceOf[Array[Object]]
+ projection(new GenericInternalRow(values)).asInstanceOf[GenericInternalRow]
}
val (projection, output) = generateProjection
@@ -106,7 +81,10 @@ case class MergeProjection(
if (output.contains(null)) {
throw new CarbonMergeDataSetException(s"Not all columns are mapped")
}
- (new InterpretedMutableProjection(output,
ds.queryExecution.analyzed.output), expecOutput)
+ (new InterpretedMutableProjection(output++Seq(
+ ds.queryExecution.analyzed.resolveQuoted(statusCol,
+ sparkSession.sessionState.analyzer.resolver).get),
+ ds.queryExecution.analyzed.output), expecOutput)
} else {
(null, null)
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index f91bce0..c19a132 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -83,6 +83,22 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll
{
(dwSelframe, odsframe)
}
+ private def initializePartition = {
+ val initframe = generateData(10)
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "order")
+ .option("partitionColumns", "c_name")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val dwframe = sqlContext.read.format("carbondata").option("tableName",
"order").load()
+ val dwSelframe = dwframe.as("A")
+
+ val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+ (dwSelframe, odsframe)
+ }
+
test("test basic merge update with all mappings") {
sql("drop table if exists order")
val (dwSelframe, odsframe) = initialize
@@ -205,6 +221,7 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ sql("select * from order").show()
checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -392,6 +409,52 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select count(*) from order_hist where c_name =
'insert'"), Seq(Row(2)))
}
+ test("test merge update with insert, insert with condition and expression
and delete with insert history action with partition") {
+ sql("drop table if exists order")
+ sql("drop table if exists order_hist")
+ sql("create table order_hist(id string, name string, quantity int, price
int, state int) PARTITIONED BY (c_name String) STORED AS carbondata")
+ val (dwSelframe, odsframe) = initializePartition
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ val insertMap_u = Map(col("id") -> col("id"),
+ col("name") -> col("name"),
+ col("c_name") -> lit("insert"),
+ col("quantity") -> col("quantity"),
+ col("price") -> expr("price"),
+ col("state") -> col("state"))
+
+ val insertMap_d = Map(col("id") -> col("id"),
+ col("name") -> col("name"),
+ col("c_name") -> lit("delete"),
+ col("quantity") -> col("quantity"),
+ col("price") -> expr("price"),
+ col("state") -> col("state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
TableIdentifier("order_hist"))))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
TableIdentifier("order_hist"))))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ checkAnswer(sql("select count(*) from order_hist where c_name =
'delete'"), Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order_hist where c_name =
'insert'"), Seq(Row(2)))
+ }
+
test("check the scd ") {
sql("drop table if exists customers")
@@ -447,6 +510,55 @@ class MergeTestCase extends QueryTest with
BeforeAndAfterAll {
}
+ test("check the ccd with partition") {
+ sql("drop table if exists target")
+
+ val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "0"),
+ Row("b", "1"),
+ Row("c", "2"),
+ Row("d", "3")
+ ).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
+
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .option("partitionColumns", "value")
+ .mode(SaveMode.Overwrite)
+ .save()
+ val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ var ccd =
+ sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "10", false, 0),
+ Row("a", null, true, 1), // a was updated and then deleted
+ Row("b", null, true, 2), // b was just deleted once
+ Row("c", null, true, 3), // c was deleted and then updated twice
+ Row("c", "20", false, 4),
+ Row("c", "200", false, 5),
+ Row("e", "100", false, 6) // new key
+ ).asJava,
+ StructType(Seq(StructField("key", StringType),
+ StructField("newValue", StringType),
+ StructField("deleted", BooleanType), StructField("time",
IntegerType))))
+ ccd.createOrReplaceTempView("changes")
+
+ ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
+
+ val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+ val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+ target.as("A").merge(ccd.as("B"), "A.key=B.key").
+ whenMatched("B.deleted=false").
+ updateExpr(updateMap).
+ whenNotMatched("B.deleted=false").
+ insertExpr(insertMap).
+ whenMatched("B.deleted=true").
+ delete().execute()
+ checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
+ checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"),
Row("d", "3"), Row("e", "100")))
+ }
+
test("check the ccd ") {
sql("drop table if exists target")
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index dc3fc3c..cdac39f 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -316,6 +316,7 @@ public abstract class AbstractFactDataWriter implements
CarbonFactDataWriter {
this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() +
File.separator
+ carbonDataFileName;
try {
+ FileFactory.mkdirs(model.getCarbonDataDirectoryPath());
if (enableDirectlyWriteDataToStorePath) {
// the block size will be twice the block_size specified by user to
make sure that
// one carbondata file only consists exactly one HDFS block.