This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5b8ac84 [CARBONDATA-3597] Support Merge for SCD and CCD scenarios
5b8ac84 is described below
commit 5b8ac8478847ef6962f39362f8e16f30a9cb6a03
Author: ravipesala <[email protected]>
AuthorDate: Mon Dec 30 11:38:36 2019 +0800
[CARBONDATA-3597] Support Merge for SCD and CCD scenarios
Added dataframe API to merge the datasets online and applies the actions as
per the conditions.
The supported DataSet API as follows.
targetDS.merge(sourceDS, <condition>).
whenMatched(<condition>).
updateExpr(updateMap).
insertExpr(insertMap_u).
whenNotMatched(<condition>).
insertExpr(insertMap).
whenNotMatchedAndExistsOnlyOnTarget(<condition>).
delete().
insertHistoryTableExpr(insertMap_d, <table_name>).
execute()
This closes #3483
---
.../carbondata/core/mutate/CarbonUpdateUtil.java | 53 +-
.../impl/DictionaryBasedResultCollector.java | 19 +-
.../RestructureBasedDictionaryResultCollector.java | 9 +-
.../collector/impl/RowIdBasedResultCollector.java | 9 +-
.../scan/executor/impl/AbstractQueryExecutor.java | 1 +
.../scan/executor/infos/BlockExecutionInfo.java | 14 +
.../carbondata/core/scan/model/QueryModel.java | 14 +
.../core/scan/result/BlockletScannedResult.java | 1 -
.../scan/result/impl/FilterQueryScannedResult.java | 2 -
.../result/impl/NonFilterQueryScannedResult.java | 1 -
.../statusmanager/SegmentUpdateStatusManager.java | 74 ++-
dev/javastyle-config.xml | 3 -
.../carbondata/hadoop/api/CarbonInputFormat.java | 8 +-
.../hadoop/api/CarbonTableInputFormat.java | 13 +-
.../spark/testsuite/merge/MergeTestCase.scala | 501 +++++++++++++++++++
.../spark/rdd/CarbonDeltaRowScanRDD.scala | 90 ++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
.../apache/spark/sql/hive/DistributionUtil.scala | 2 +-
.../SparkGenericRowReadSupportImpl.java | 59 +++
.../indexserver/DistributedRDDUtils.scala | 4 +-
.../scala/org/apache/spark/sql/CarbonSession.scala | 11 +
.../command/mutation/DeleteExecution.scala | 260 ++++++----
.../mutation/merge/CarbonMergeDataSetCommand.scala | 531 +++++++++++++++++++++
.../merge/CarbonMergeDataSetException.scala | 33 ++
.../mutation/merge/HistoryTableLoadHelper.scala | 136 ++++++
.../mutation/merge/MergeDataSetBuilder.scala | 134 ++++++
.../command/mutation/merge/MergeProjection.scala | 114 +++++
.../command/mutation/merge/MutationAction.scala | 174 +++++++
.../command/mutation/merge/TranxManager.scala | 62 +++
.../command/mutation/merge/interfaces.scala | 88 ++++
30 files changed, 2259 insertions(+), 163 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 2b3096e..c9b4360 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -134,30 +134,7 @@ public class CarbonUpdateUtil {
List<SegmentUpdateDetails> oldList = new
ArrayList(Arrays.asList(oldDetails));
for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
- int index = oldList.indexOf(newBlockEntry);
- if (index != -1) {
- // update the element in existing list.
- SegmentUpdateDetails blockDetail = oldList.get(index);
- if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() ||
(isCompaction)) {
- blockDetail
-
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
- }
-
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
- blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
-
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
- // If the start and end time is different then the delta is there
in multiple files so
- // add them to the list to get the delta files easily with out
listing.
- if (!blockDetail.getDeleteDeltaStartTimestamp()
- .equals(blockDetail.getDeleteDeltaEndTimestamp())) {
-
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
-
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
- } else {
- blockDetail.setDeltaFileStamps(null);
- }
- } else {
- // add the new details to the list.
- oldList.add(newBlockEntry);
- }
+ mergeSegmentUpdate(isCompaction, oldList, newBlockEntry);
}
segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList,
updateStatusFileIdentifier);
@@ -180,6 +157,34 @@ public class CarbonUpdateUtil {
return status;
}
+ public static void mergeSegmentUpdate(boolean isCompaction,
List<SegmentUpdateDetails> oldList,
+ SegmentUpdateDetails newBlockEntry) {
+ int index = oldList.indexOf(newBlockEntry);
+ if (index != -1) {
+ // update the element in existing list.
+ SegmentUpdateDetails blockDetail = oldList.get(index);
+ if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() ||
isCompaction) {
+ blockDetail
+
.setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
+ }
+
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
+ blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
+ blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
+ // If the start and end time is different then the delta is there in
multiple files so
+ // add them to the list to get the delta files easily with out listing.
+ if (!blockDetail.getDeleteDeltaStartTimestamp()
+ .equals(blockDetail.getDeleteDeltaEndTimestamp())) {
+
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
+
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
+ } else {
+ blockDetail.setDeltaFileStamps(null);
+ }
+ } else {
+ // add the new details to the list.
+ oldList.add(newBlockEntry);
+ }
+ }
+
/**
* Update table status
* @param updatedSegmentsList
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index d011da3..554d11a 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -98,6 +98,8 @@ public class DictionaryBasedResultCollector extends
AbstractScannedResultCollect
private Map<Integer, Map<CarbonDimension, ByteBuffer>>
mergedComplexDimensionDataMap =
new HashMap<>();
+ private boolean readOnlyDelta;
+
public DictionaryBasedResultCollector(BlockExecutionInfo
blockExecutionInfos) {
super(blockExecutionInfos);
queryDimensions = executionInfo.getProjectionDimensions();
@@ -105,7 +107,7 @@ public class DictionaryBasedResultCollector extends
AbstractScannedResultCollect
initDimensionAndMeasureIndexesForFillingData();
isDimensionExists = queryDimensions.length > 0;
this.comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap();
-
+ this.readOnlyDelta = executionInfo.isReadOnlyDelta();
}
/**
@@ -136,6 +138,16 @@ public class DictionaryBasedResultCollector extends
AbstractScannedResultCollect
}
}
while (scannedResult.hasNext() && rowCounter < batchSize) {
+ scannedResult.incrementCounter();
+ if (readOnlyDelta) {
+ if
(!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+ continue;
+ }
+ } else {
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId()))
{
+ continue;
+ }
+ }
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
@@ -151,11 +163,6 @@ public class DictionaryBasedResultCollector extends
AbstractScannedResultCollect
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys,
complexTypeKeyArray,
comlexDimensionInfoMap, row, i,
queryDimensions[i].getDimension().getOrdinal());
}
- } else {
- scannedResult.incrementCounter();
- }
- if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
- continue;
}
fillMeasureData(scannedResult, row);
if (isStructQueryType) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 3627e00..522aaf1 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -74,6 +74,10 @@ public class RestructureBasedDictionaryResultCollector
extends DictionaryBasedRe
Map<Integer, GenericQueryType> comlexDimensionInfoMap =
executionInfo.getComlexDimensionInfoMap();
while (scannedResult.hasNext() && rowCounter < batchSize) {
+ scannedResult.incrementCounter();
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+ continue;
+ }
Object[] row = new Object[queryDimensions.length + queryMeasures.length];
if (isDimensionExists) {
surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
@@ -101,11 +105,6 @@ public class RestructureBasedDictionaryResultCollector
extends DictionaryBasedRe
comlexDimensionInfoMap, row, i, executionInfo
.getProjectionDimensions()[segmentDimensionsIdx++].getDimension().getOrdinal());
}
- } else {
- scannedResult.incrementCounter();
- }
- if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
- continue;
}
fillMeasureData(scannedResult, row);
listBasedResult.add(row);
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
index 2111b02..7a0732b 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java
@@ -45,6 +45,10 @@ public class RowIdBasedResultCollector extends
DictionaryBasedResultCollector {
byte[][] complexTypeKeyArray;
int columnCount = queryDimensions.length + queryMeasures.length;
while (scannedResult.hasNext() && rowCounter < batchSize) {
+ scannedResult.incrementCounter();
+ if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
+ continue;
+ }
Object[] row = new Object[columnCount + 3];
row[columnCount] = scannedResult.getBlockletNumber();
row[columnCount + 1] = scannedResult.getCurrentPageCounter();
@@ -59,13 +63,8 @@ public class RowIdBasedResultCollector extends
DictionaryBasedResultCollector {
fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys,
complexTypeKeyArray,
comlexDimensionInfoMap, row, i,
queryDimensions[i].getDimension().getOrdinal());
}
- } else {
- scannedResult.incrementCounter();
}
row[columnCount + 2] = scannedResult.getCurrentRowId();
- if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) {
- continue;
- }
fillMeasureData(scannedResult, row);
listBasedResult.add(row);
rowCounter++;
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index c891ba2..ab21819 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -487,6 +487,7 @@ public abstract class AbstractQueryExecutor<E> implements
QueryExecutor<E> {
blockExecutionInfo
.setTotalNumberDimensionToRead(
segmentProperties.getDimensionOrdinalToChunkMapping().size());
+ blockExecutionInfo.setReadOnlyDelta(queryModel.isReadOnlyDelta());
if (queryModel.isReadPageByPage()) {
blockExecutionInfo.setPrefetchBlocklet(false);
LOGGER.info("Query prefetch is: false, read page by page");
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index 0bd053c..51368dd 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -228,6 +228,12 @@ public class BlockExecutionInfo {
private ReusableDataBuffer[] measureResusableDataBuffer;
/**
+ * It is used to read only the deleted data of a particular version. It will
be used to get the
+ * old updated/deleted data before update.
+ */
+ private boolean readOnlyDelta;
+
+ /**
* @param blockIndex the tableBlock to set
*/
public void setDataBlock(AbstractIndex blockIndex) {
@@ -659,4 +665,12 @@ public class BlockExecutionInfo {
public void setMeasureResusableDataBuffer(ReusableDataBuffer[]
measureResusableDataBuffer) {
this.measureResusableDataBuffer = measureResusableDataBuffer;
}
+
+ public boolean isReadOnlyDelta() {
+ return readOnlyDelta;
+ }
+
+ public void setReadOnlyDelta(boolean readOnlyDelta) {
+ this.readOnlyDelta = readOnlyDelta;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d604e15..3f300e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -118,6 +118,12 @@ public class QueryModel {
*/
private boolean isDirectVectorFill;
+ /**
+ * It is used to read only the deleted data of a particular version. It will
be used to get the
+ * old updated/deleted data before update.
+ */
+ private boolean readOnlyDelta;
+
private QueryModel(CarbonTable carbonTable) {
tableBlockInfos = new ArrayList<TableBlockInfo>();
this.table = carbonTable;
@@ -396,6 +402,14 @@ public class QueryModel {
isDirectVectorFill = directVectorFill;
}
+ public boolean isReadOnlyDelta() {
+ return readOnlyDelta;
+ }
+
+ public void setReadOnlyDelta(boolean readOnlyDelta) {
+ this.readOnlyDelta = readOnlyDelta;
+ }
+
@Override
public String toString() {
return String.format("scan on table %s.%s, %d projection columns with
filter (%s)",
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index a4c1c6c..6c0ab4d 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -247,7 +247,6 @@ public abstract class BlockletScannedResult {
column =
dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
.fillSurrogateKey(rowId, column, completeKey);
}
- rowCounter++;
return completeKey;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 245135a..f338888 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -42,7 +42,6 @@ public class FilterQueryScannedResult extends
BlockletScannedResult {
*/
@Override
public byte[] getDictionaryKeyArray() {
- ++currentRow;
return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
}
@@ -52,7 +51,6 @@ public class FilterQueryScannedResult extends
BlockletScannedResult {
*/
@Override
public int[] getDictionaryKeyIntegerArray() {
- ++currentRow;
return
getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index c9f6b0c..98576fa 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -51,7 +51,6 @@ public class NonFilterQueryScannedResult extends
BlockletScannedResult {
*/
@Override
public int[] getDictionaryKeyIntegerArray() {
- ++currentRow;
return getDictionaryKeyIntegerArray(currentRow);
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 4cefea2..63f9471 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -17,20 +17,8 @@
package org.apache.carbondata.core.statusmanager;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.io.*;
+import java.util.*;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -80,15 +68,30 @@ public class SegmentUpdateStatusManager {
public SegmentUpdateStatusManager(CarbonTable table,
LoadMetadataDetails[] segmentDetails) {
+ this(table, segmentDetails, null);
+ }
+
+ /**
+ * It takes the updateVersion as one of the parameter. Basically user can
give on which
+ * updateVersion user can retrieve the data.It is useful to get the history
changed data
+ * of a particular version.
+ */
+ public SegmentUpdateStatusManager(CarbonTable table,
+ LoadMetadataDetails[] segmentDetails, String updateVersion) {
this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update
always requires to work
// on latest file status.
this.segmentDetails = segmentDetails;
updateDetails = readLoadMetadata();
+ updateUpdateDetails(updateVersion);
populateMap();
}
public SegmentUpdateStatusManager(CarbonTable table) {
+ this(table, (String) null);
+ }
+
+ public SegmentUpdateStatusManager(CarbonTable table, String updateVersion) {
this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update
always requires to work
// on latest file status.
@@ -104,10 +107,34 @@ public class SegmentUpdateStatusManager {
} else {
updateDetails = new SegmentUpdateDetails[0];
}
+ updateUpdateDetails(updateVersion);
populateMap();
}
/**
+ * It adds only the SegmentUpdateDetails of given updateVersion, it is used
to get the history
+ * data of updated/deleted data.
+ */
+ private void updateUpdateDetails(String updateVersion) {
+ if (updateVersion != null) {
+ List<SegmentUpdateDetails> newupdateDetails = new ArrayList<>();
+ for (SegmentUpdateDetails updateDetail : updateDetails) {
+ if (updateDetail.getDeltaFileStamps() != null) {
+ if (updateDetail.getDeltaFileStamps().contains(updateVersion)) {
+ HashSet<String> set = new HashSet<>();
+ set.add(updateVersion);
+ updateDetail.setDeltaFileStamps(set);
+ newupdateDetails.add(updateDetail);
+ }
+ } else if
(updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) {
+ newupdateDetails.add(updateDetail);
+ }
+ }
+ updateDetails = newupdateDetails.toArray(new SegmentUpdateDetails[0]);
+ }
+ }
+
+ /**
* populate the block and its details in a map.
*/
private void populateMap() {
@@ -640,21 +667,30 @@ public class SegmentUpdateStatusManager {
* @return
*/
public SegmentUpdateDetails[] readLoadMetadata() {
+ // get the updated status file identifier from the table status.
+ String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
+ return readLoadMetadata(tableUpdateStatusIdentifier,
identifier.getTablePath());
+ }
+
+ /**
+ * This method loads segment update details
+ *
+ * @return
+ */
+ public static SegmentUpdateDetails[] readLoadMetadata(String
tableUpdateStatusIdentifier,
+ String tablePath) {
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;
- // get the updated status file identifier from the table status.
- String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
-
if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) {
return new SegmentUpdateDetails[0];
}
String tableUpdateStatusPath =
- CarbonTablePath.getMetadataPath(identifier.getTablePath()) +
+ CarbonTablePath.getMetadataPath(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier;
AtomicFileOperations fileOperation =
AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath);
@@ -735,7 +771,7 @@ public class SegmentUpdateStatusManager {
*
* @param streams - streams to close.
*/
- private void closeStreams(Closeable... streams) {
+ private static void closeStreams(Closeable... streams) {
// Added if to avoid NullPointerException in case one stream is being
passed as null
if (null != streams) {
for (Closeable stream : streams) {
diff --git a/dev/javastyle-config.xml b/dev/javastyle-config.xml
index 824cbc8..332c85a 100644
--- a/dev/javastyle-config.xml
+++ b/dev/javastyle-config.xml
@@ -175,9 +175,6 @@
<message key="import.redundancy" value="Redundant import {0}."/>
</module>
- <!-- Checks for star import. -->
- <module name="AvoidStarImport"/>
-
<!-- Checks for placement of the left curly brace ('{'). -->
<module name="LeftCurly"/>
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 10aabf2..ae8db43 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -120,6 +120,7 @@ public abstract class CarbonInputFormat<T> extends
FileInputFormat<Void, T> {
private static final String FGDATAMAP_PRUNING =
"mapreduce.input.carboninputformat.fgdatamap";
private static final String READ_COMMITTED_SCOPE =
"mapreduce.input.carboninputformat.read.committed.scope";
+ private static final String READ_ONLY_DELTA = "readDeltaOnly";
// record segment number and hit blocks
protected int numSegments = 0;
@@ -688,11 +689,16 @@ m filterExpression
if (dataMapFilter != null) {
checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
}
- return new QueryModelBuilder(carbonTable)
+ QueryModel queryModel = new QueryModelBuilder(carbonTable)
.projectColumns(projectColumns)
.filterExpression(dataMapFilter)
.dataConverter(getDataTypeConverter(configuration))
.build();
+ String readDeltaOnly = configuration.get(READ_ONLY_DELTA);
+ if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) {
+ queryModel.setReadOnlyDelta(true);
+ }
+ return queryModel;
}
/**
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 5468c24..c47cdd6 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -85,6 +85,7 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
"mapreduce.input.carboninputformat.transactional";
public static final String DATABASE_NAME =
"mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME =
"mapreduce.input.carboninputformat.tableName";
+ public static final String UPDATE_DELTA_VERSION = "updateDeltaVersion";
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
private ReadCommittedScope readCommittedScope;
@@ -107,9 +108,15 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
}
this.readCommittedScope = getReadCommitted(job, identifier);
LoadMetadataDetails[] loadMetadataDetails =
readCommittedScope.getSegmentList();
-
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
+ String updateDeltaVersion =
job.getConfiguration().get(UPDATE_DELTA_VERSION);
+ SegmentUpdateStatusManager updateStatusManager;
+ if (updateDeltaVersion != null) {
+ updateStatusManager =
+ new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails,
updateDeltaVersion);
+ } else {
+ updateStatusManager =
+ new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
+ }
List<String> invalidSegmentIds = new ArrayList<>();
List<Segment> streamSegments = null;
// get all valid segments and set them into the configuration
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
new file mode 100644
index 0000000..f91bce0
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.merge
+
+import scala.collection.JavaConverters._
+import java.sql.Date
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches,
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched,
WhenNotMatchedAndExistsOnlyOnTarget}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType,
StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for join query with orderby and limit
+ */
+
+class MergeTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+
+ }
+
+ def generateData(numOrders: Int = 10): DataFrame = {
+ import sqlContext.implicits._
+ sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+ .map { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
+ }.toDF("id", "name", "c_name", "quantity", "price", "state")
+ }
+
+ def generateFullCDC(
+ numOrders: Int,
+ numUpdatedOrders: Int,
+ newState: Int,
+ oldState: Int,
+ numNewOrders: Int
+ ): DataFrame = {
+ import sqlContext.implicits._
+ val ds1 = sqlContext.sparkContext.parallelize(numNewOrders+1 to
(numOrders), 4)
+ .map {x =>
+ if (x <= numNewOrders + numUpdatedOrders) {
+ ("id"+x, s"order$x",s"customer$x", x*10, x*75, newState)
+ } else {
+ ("id"+x, s"order$x",s"customer$x", x*10, x*75, oldState)
+ }
+ }.toDF("id", "name", "c_name", "quantity", "price", "state")
+ val ds2 = sqlContext.sparkContext.parallelize(1 to numNewOrders, 4)
+ .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, oldState)
+ }.toDS().toDF()
+ ds1.union(ds2)
+ }
+
+ private def initialize = {
+ val initframe = generateData(10)
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "order")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val dwframe = sqlContext.read.format("carbondata").option("tableName",
"order").load()
+ val dwSelframe = dwframe.as("A")
+
+ val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B")
+ (dwSelframe, odsframe)
+ }
+
+ test("test basic merge update with all mappings") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ val updateMap = Map("id" -> "A.id",
+ "name" -> "B.name",
+ "c_name" -> "B.c_name",
+ "quantity" -> "B.quantity",
+ "price" -> "B.price",
+ "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+ col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test basic merge update with few mappings") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe, "A.id=B.id").whenMatched("A.state <>
B.state").updateExpr(updateMap).execute()
+
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test basic merge update with few mappings and expressions") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ val updateMap = Map("id" -> "A.id",
+ "price" -> "B.price * 100",
+ "state" -> "B.state").asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
+ col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+
+ checkAnswer(sql("select price from order where where state = 2"),
Seq(Row(22500), Row(30000)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test basic merge update with few mappings with out condition") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe,
col("A.id").equalTo(col("B.id"))).whenMatched().updateExpr(updateMap).execute()
+
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test merge insert with condition") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ "c_name" -> col("B.c_name"),
+ col("quantity") -> "B.quantity",
+ col("price") -> col("B.price"),
+ col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).
+ whenNotMatched(col("A.id").isNull.and(col("B.id").isNotNull)).
+ insertExpr(insertMap).execute()
+
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ }
+
+ test("test merge update and insert with out condition") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> col("B.price"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+
+ val st = System.currentTimeMillis()
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test merge update and insert with condition") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> col("B.price"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++=
Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap)))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test merge update and insert with condition and expression") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++=
Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap)))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ }
+
+ test("test merge with only delete action") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
+ }
+
+ test("test merge update and delete action") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ }
+
+ test("test merge update and insert with condition and expression and delete
action") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order where id like 'newid%'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ }
+
+ test("test merge update with insert, insert with condition and expression
and delete with insert action") {
+ sql("drop table if exists order")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> "B.price + 1",
+ col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
+
+ val insertMap_u = Map(col("id") -> col("A.id"),
+ col("name") -> col("A.name"),
+ col("c_name") -> lit("insert"),
+ col("quantity") -> col("A.quantity"),
+ col("price") -> expr("A.price"),
+ col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]]
+
+ val insertMap_d = Map(col("id") -> col("A.id"),
+ col("name") -> col("A.name"),
+ col("c_name") -> lit("delete"),
+ col("quantity") -> col("A.quantity"),
+ col("price") -> expr("A.price"),
+ col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]]
+
+ dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).
+ whenMatched(col("A.state") =!= col("B.state")).
+ updateExpr(updateMap).insertExpr(insertMap_u).
+ whenNotMatched().
+ insertExpr(insertMap).
+ whenNotMatchedAndExistsOnlyOnTarget().
+ delete().
+ insertExpr(insertMap_d).
+ execute()
+ sql("select * from order").show()
+ checkAnswer(sql("select count(*) from order where c_name = 'delete'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order where c_name = 'insert'"),
Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order"), Seq(Row(14)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ }
+
+ test("test merge update with insert, insert with condition and expression
and delete with insert history action") {
+ sql("drop table if exists order")
+ sql("drop table if exists order_hist")
+ sql("create table order_hist(id string, name string, c_name string,
quantity int, price int, state int) stored as carbondata")
+ val (dwSelframe, odsframe) = initialize
+
+ var matches = Seq.empty[MergeMatch]
+ val updateMap = Map(col("id") -> col("A.id"),
+ col("price") -> expr("B.price + 1"),
+ col("state") -> col("B.state"))
+
+ val insertMap = Map(col("id") -> col("B.id"),
+ col("name") -> col("B.name"),
+ col("c_name") -> col("B.c_name"),
+ col("quantity") -> col("B.quantity"),
+ col("price") -> expr("B.price * 100"),
+ col("state") -> col("B.state"))
+
+ val insertMap_u = Map(col("id") -> col("id"),
+ col("name") -> col("name"),
+ col("c_name") -> lit("insert"),
+ col("quantity") -> col("quantity"),
+ col("price") -> expr("price"),
+ col("state") -> col("state"))
+
+ val insertMap_d = Map(col("id") -> col("id"),
+ col("name") -> col("name"),
+ col("c_name") -> lit("delete"),
+ col("quantity") -> col("quantity"),
+ col("price") -> expr("price"),
+ col("state") -> col("state"))
+
+ matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
TableIdentifier("order_hist"))))
+ matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+ matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
TableIdentifier("order_hist"))))
+
+ CarbonMergeDataSetCommand(dwSelframe,
+ odsframe,
+ MergeDataSetMatches(col("A.id").equalTo(col("B.id")),
matches.toList)).run(sqlContext.sparkSession)
+ checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+ checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+ checkAnswer(sql("select price from order where id = 'newid1'"),
Seq(Row(7500)))
+ checkAnswer(sql("select count(*) from order_hist where c_name =
'delete'"), Seq(Row(2)))
+ checkAnswer(sql("select count(*) from order_hist where c_name =
'insert'"), Seq(Row(2)))
+ }
+
+ test("check the scd ") {
+ sql("drop table if exists customers")
+
+ val initframe =
+ sqlContext.sparkSession.createDataFrame(Seq(
+ Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")),
+ Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null),
+ Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null),
+ Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null)
+ ).asJava, StructType(Seq(StructField("customerId", IntegerType),
StructField("address", StringType), StructField("current", BooleanType),
StructField("effectiveDate", DateType), StructField("endDate", DateType))))
+ initframe.printSchema()
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "customers")
+ .mode(SaveMode.Overwrite)
+ .save()
+ var customer = sqlContext.read.format("carbondata").option("tableName",
"customers").load()
+ customer = customer.as("A")
+ var updates =
+ sqlContext.sparkSession.createDataFrame(Seq(
+ Row(1, "new address for 1", Date.valueOf("2018-03-03")),
+ Row(3, "current address for 3", Date.valueOf("2018-04-04")), // new
address same as current address for customer 3
+ Row(4, "new address for 4", Date.valueOf("2018-04-04"))
+ ).asJava, StructType(Seq(StructField("customerId", IntegerType),
StructField("address", StringType), StructField("effectiveDate", DateType))))
+ updates = updates.as("B")
+
+ val updateMap = Map(col("current") -> lit(false),
+ col("endDate") -> col("B.effectiveDate")).asInstanceOf[Map[Any, Any]]
+
+ val insertMap = Map(col("customerId") -> col("B.customerId"),
+ col("address") -> col("B.address"),
+ col("current") -> lit(true),
+ col("effectiveDate") -> col("B.effectiveDate"),
+ col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]]
+
+ val insertMap_u = Map(col("customerId") -> col("B.customerId"),
+ col("address") -> col("B.address"),
+ col("current") -> lit(true),
+ col("effectiveDate") -> col("B.effectiveDate"),
+ col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]]
+
+ customer.merge(updates, "A.customerId=B.customerId").
+ whenMatched((col("A.address") =!=
col("B.address")).and(col("A.current").equalTo(lit(true)))).
+ updateExpr(updateMap).
+ insertExpr(insertMap_u).
+
whenNotMatched(col("A.customerId").isNull.and(col("B.customerId").isNotNull)).
+ insertExpr(insertMap).
+ execute()
+
+ checkAnswer(sql("select count(*) from customers"), Seq(Row(6)))
+ checkAnswer(sql("select count(*) from customers where current='true'"),
Seq(Row(4)))
+ checkAnswer(sql("select count(*) from customers where effectivedate is not
null and enddate is not null"), Seq(Row(1)))
+
+ }
+
+ test("check the ccd ") {
+ sql("drop table if exists target")
+
+ val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "0"),
+ Row("b", "1"),
+ Row("c", "2"),
+ Row("d", "3")
+ ).asJava, StructType(Seq(StructField("key", StringType),
StructField("value", StringType))))
+
+ initframe.write
+ .format("carbondata")
+ .option("tableName", "target")
+ .mode(SaveMode.Overwrite)
+ .save()
+ val target = sqlContext.read.format("carbondata").option("tableName",
"target").load()
+ var ccd =
+ sqlContext.sparkSession.createDataFrame(Seq(
+ Row("a", "10", false, 0),
+ Row("a", null, true, 1), // a was updated and then deleted
+ Row("b", null, true, 2), // b was just deleted once
+ Row("c", null, true, 3), // c was deleted and then updated twice
+ Row("c", "20", false, 4),
+ Row("c", "200", false, 5),
+ Row("e", "100", false, 6) // new key
+ ).asJava,
+ StructType(Seq(StructField("key", StringType),
+ StructField("newValue", StringType),
+ StructField("deleted", BooleanType), StructField("time",
IntegerType))))
+ ccd.createOrReplaceTempView("changes")
+
+ ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as
deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
changes GROUP BY key)")
+
+ val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+ val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
+
+ target.as("A").merge(ccd.as("B"), "A.key=B.key").
+ whenMatched("B.deleted=false").
+ updateExpr(updateMap).
+ whenNotMatched("B.deleted=false").
+ insertExpr(insertMap).
+ whenMatched("B.deleted=true").
+ delete().execute()
+ checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
+ checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"),
Row("d", "3"), Row("e", "100")))
+ }
+
+ override def afterAll {
+ sql("drop table if exists order")
+ }
+}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
new file mode 100644
index 0000000..ea32bdf
--- /dev/null
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Partition
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datamap.DataMapFilter
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.util.DataTypeConverter
+import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.spark.InitInputMetrics
+
+/**
+ * It can get the deleted/updated records on any particular update version. It
is useful to get the
+ * records changed on any particular update transaction.
+ */
+class CarbonDeltaRowScanRDD[T: ClassTag](
+ @transient private val spark: SparkSession,
+ @transient private val serializedTableInfo: Array[Byte],
+ @transient private val tableInfo: TableInfo,
+ @transient override val partitionNames: Seq[PartitionSpec],
+ override val columnProjection: CarbonProjection,
+ var filter: DataMapFilter,
+ identifier: AbsoluteTableIdentifier,
+ inputMetricsStats: InitInputMetrics,
+ override val dataTypeConverterClz: Class[_ <: DataTypeConverter] =
+ classOf[SparkDataTypeConverterImpl],
+ override val readSupportClz: Class[_ <: CarbonReadSupport[_]] =
+ SparkReadSupport.readSupportClass,
+ deltaVersionToRead: String) extends
+ CarbonScanRDD[T](
+ spark,
+ columnProjection,
+ filter,
+ identifier,
+ serializedTableInfo,
+ tableInfo,
+ inputMetricsStats,
+ partitionNames,
+ dataTypeConverterClz,
+ readSupportClz) {
+ override def internalGetPartitions: Array[Partition] = {
+ val table = CarbonTable.buildFromTableInfo(getTableInfo)
+ val updateStatusManager = new SegmentUpdateStatusManager(table,
deltaVersionToRead)
+
+ val parts = super.internalGetPartitions
+ parts.map { p =>
+ val partition = p.asInstanceOf[CarbonSparkPartition]
+ val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s =>
+ updateStatusManager.getDetailsForABlock(
+ CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId,
s.getBlockPath)) != null
+ }.asJava
+ new CarbonSparkPartition(partition.rddId, partition.index,
+ new CarbonMultiBlockSplit(splits,
partition.multiBlockSplit.getLocations))
+ }.filter(p => p.multiBlockSplit.getAllSplits.size() >
0).asInstanceOf[Array[Partition]]
+ }
+
+ override def createInputFormat(conf: Configuration):
CarbonTableInputFormat[Object] = {
+ val format = super.createInputFormat(conf)
+ conf.set("updateDeltaVersion", deltaVersionToRead)
+ conf.set("readDeltaOnly", "true")
+ format
+ }
+}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a6dfc8a..5d75742 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -638,7 +638,7 @@ class CarbonScanRDD[T: ClassTag](
}
- private def createInputFormat(conf: Configuration):
CarbonTableInputFormat[Object] = {
+ def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] =
{
val format = new CarbonTableInputFormat[Object]
CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index ca35adc..62e52d0 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -206,7 +206,7 @@ object DistributionUtil {
* @param sparkContext
* @return
*/
- private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+ def getConfiguredExecutors(sparkContext: SparkContext): Int = {
var confExecutors: Int = 0
if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled",
false)) {
// default value for spark.dynamicAllocation.maxExecutors is infinity
diff --git
a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
new file mode 100644
index 0000000..a76c79d
--- /dev/null
+++
b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.readsupport;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+
+public class SparkGenericRowReadSupportImpl extends
DictionaryDecodeReadSupport<Row> {
+
+ @Override
+ public void initialize(CarbonColumn[] carbonColumns,
+ CarbonTable carbonTable) throws IOException {
+ super.initialize(carbonColumns, carbonTable);
+ }
+
+ @Override
+ public Row readRow(Object[] data) {
+ assert (data.length == dictionaries.length);
+ for (int i = 0; i < dictionaries.length; i++) {
+ if (dictionaries[i] != null) {
+ data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+ }
+ if (dataTypes[i] == DataTypes.DATE) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(new Date(0));
+ c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+ data[i] = new Date(c.getTime().getTime());
+ } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
+ data[i] = new Timestamp((long) data[i] / 1000);
+ }
+ }
+ return new GenericRow(data);
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 233ad4d..bf5bc40 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hdfs.DFSClient.Conf
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.Partition
import org.apache.spark.sql.SparkSession
@@ -33,10 +32,9 @@ import
org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import
org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope,
TableStatusReadCommittedScope}
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
object DistributedRDDUtils {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index deefcd1..063eaf5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
+import
org.apache.spark.sql.execution.command.mutation.merge.MergeDataSetBuilder
import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.profiler.{Profiler, SQLStart}
@@ -283,6 +284,16 @@ object CarbonSession {
}
}
+ implicit class DataSetMerge(val ds: Dataset[Row]) {
+ def merge(srcDS: Dataset[Row], expr: String): MergeDataSetBuilder = {
+ new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
+ }
+
+ def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = {
+ new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession)
+ }
+ }
+
def threadSet(key: String, value: String): Unit = {
var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (currentThreadSessionInfo == null) {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index ecb74dc..d744e96 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil,
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
-import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.mutate.data.{BlockMappingVO,
RowCountDetailsVO}
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
ThreadLocalSessionInfo}
@@ -55,35 +55,71 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
object DeleteExecution {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ def deleteDeltaExecution(
+ databaseNameOp: Option[String],
+ tableName: String,
+ sparkSession: SparkSession,
+ dataRdd: RDD[Row],
+ timestamp: String,
+ isUpdateOperation: Boolean,
+ executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
+
+ val (res, blockMappingVO) = deleteDeltaExecutionInternal(databaseNameOp,
+ tableName, sparkSession, dataRdd, timestamp, isUpdateOperation,
executorErrors)
+ var segmentsTobeDeleted = Seq.empty[Segment]
+ var operatedRowCount = 0L
+ // if no loads are present then no need to do anything.
+ if (res.flatten.isEmpty) {
+ return (segmentsTobeDeleted, operatedRowCount)
+ }
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
+ // update new status file
+ segmentsTobeDeleted =
+ checkAndUpdateStatusFiles(executorErrors,
+ res, carbonTable, timestamp,
+ blockMappingVO, isUpdateOperation)
+
+ if (executorErrors.failureCauses == FailureCauses.NONE) {
+ operatedRowCount = res.flatten.map(_._2._3).sum
+ }
+ (segmentsTobeDeleted, operatedRowCount)
+ }
+
/**
* generate the delete delta files in each segment as per the RDD.
* @return it gives the segments which needs to be deleted.
*/
- def deleteDeltaExecution(
+ def deleteDeltaExecutionInternal(
databaseNameOp: Option[String],
tableName: String,
sparkSession: SparkSession,
dataRdd: RDD[Row],
timestamp: String,
isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
+ executorErrors: ExecutionErrors,
+ tupleId: Option[Int] = None):
+ (Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))]], BlockMappingVO) = {
var res: Array[List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long))]] = null
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val tablePath = absoluteTableIdentifier.getTablePath
- var segmentsTobeDeleted = Seq.empty[Segment]
- var operatedRowCount = 0L
val deleteRdd = if (isUpdateOperation) {
val schema =
org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
org.apache.spark.sql.types.StringType)))
- val rdd = dataRdd
- .map(row => Row(row.get(row.fieldIndex(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+ val rdd = tupleId match {
+ case Some(id) =>
+ dataRdd
+ .map(row => Row(row.get(id)))
+ case _ =>
+ dataRdd
+ .map(row => Row(row.get(row.fieldIndex(
+ CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
+ }
sparkSession.createDataFrame(rdd, schema).rdd
} else {
dataRdd
@@ -91,16 +127,26 @@ object DeleteExecution {
val (carbonInputFormat, job) =
createCarbonInputFormat(absoluteTableIdentifier)
CarbonInputFormat.setTableInfo(job.getConfiguration,
carbonTable.getTableInfo)
- val keyRdd = deleteRdd.map({ row =>
- val tupleId: String = row
-
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
- (key, row)
- }).groupByKey()
+ val keyRdd = tupleId match {
+ case Some(id) =>
+ deleteRdd.map { row =>
+ val tupleId: String = row.getString(id)
+ val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+ (key, row)
+ }.groupByKey()
+ case _ =>
+ deleteRdd.map { row =>
+ val tupleId: String = row
+
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+ val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+ (key, row)
+ }.groupByKey()
+ }
// if no loads are present then no need to do anything.
if (keyRdd.partitions.length == 0) {
- return (segmentsTobeDeleted, operatedRowCount)
+ return (Array.empty[List[(SegmentStatus,
+ (SegmentUpdateDetails, ExecutionErrors, Long))]], null)
}
val blockMappingVO =
carbonInputFormat.getBlockRowCount(
@@ -144,76 +190,6 @@ object DeleteExecution {
result
}).collect()
- // if no loads are present then no need to do anything.
- if (res.flatten.isEmpty) {
- return (segmentsTobeDeleted, operatedRowCount)
- }
-
- // update new status file
- checkAndUpdateStatusFiles()
-
- // all or none : update status file, only if complete delete opeartion is
successfull.
- def checkAndUpdateStatusFiles(): Unit = {
- val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
- val segmentDetails = new util.HashSet[Segment]()
- res.foreach(resultOfSeg => resultOfSeg.foreach(
- resultOfBlock => {
- if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
- blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
- // if this block is invalid then decrement block count in map.
- if
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
- CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
- blockMappingVO.getSegmentNumberOfBlockMapping)
- }
- } else {
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- val errorMsg =
- "Delete data operation is failed due to failure in creating
delete delta file for " +
- "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
- resultOfBlock._2._1.getBlockName
- executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
- executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-
- if (executorErrors.failureCauses == FailureCauses.NONE) {
- executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- executorErrors.errorMsg = errorMsg
- }
- LOGGER.error(errorMsg)
- return
- }
- }
- )
- )
-
- val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
-
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
-
- segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
-
- // this is delete flow so no need of putting timestamp in the status
file.
- if (CarbonUpdateUtil
- .updateSegmentStatus(blockUpdateDetailsList, carbonTable,
timestamp, false) &&
- CarbonUpdateUtil
- .updateTableMetadataStatus(segmentDetails,
- carbonTable,
- timestamp,
- !isUpdateOperation,
- listOfSegmentToBeMarkedDeleted)
- ) {
- LOGGER.info(s"Delete data operation is successful for ${ database }.${
tableName }")
- } else {
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- val errorMessage = "Delete data operation is failed due to failure " +
- "in table status updation."
- LOGGER.error("Delete data operation is failed due to failure in table
status updation.")
- executorErrors.failureCauses =
FailureCauses.STATUS_FILE_UPDATION_FAILURE
- executorErrors.errorMsg = errorMessage
- }
- }
-
def deleteDeltaFunc(index: Int,
key: String,
iter: Iterator[Row],
@@ -322,10 +298,118 @@ object DeleteExecution {
resultIter
}
- if (executorErrors.failureCauses == FailureCauses.NONE) {
- operatedRowCount = res.flatten.map(_._2._3).sum
+ (res, blockMappingVO)
+ }
+
+ // all or none : update status file, only if complete delete opeartion is
successfull.
+ def checkAndUpdateStatusFiles(
+ executorErrors: ExecutionErrors,
+ res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))]],
+ carbonTable: CarbonTable,
+ timestamp: String,
+ blockMappingVO: BlockMappingVO,
+ isUpdateOperation: Boolean): Seq[Segment] = {
+ val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+ val segmentDetails = new util.HashSet[Segment]()
+ res.foreach(resultOfSeg => resultOfSeg.foreach(
+ resultOfBlock => {
+ if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
+ blockUpdateDetailsList.add(resultOfBlock._2._1)
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
+ // if this block is invalid then decrement block count in map.
+ if
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
+ CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+ blockMappingVO.getSegmentNumberOfBlockMapping)
+ }
+ } else {
+ // In case of failure , clean all related delete delta files
+ CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+ val errorMsg =
+ "Delete data operation is failed due to failure in creating delete
delta file for " +
+ "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+ resultOfBlock._2._1.getBlockName
+ executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+ executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+ if (executorErrors.failureCauses == FailureCauses.NONE) {
+ executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+ executorErrors.errorMsg = errorMsg
+ }
+ LOGGER.error(errorMsg)
+ return Seq.empty[Segment]
+ }
+ }))
+
+ val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+ val segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
+
+ // this is delete flow so no need of putting timestamp in the status file.
+ if (CarbonUpdateUtil
+ .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp,
false) &&
+ CarbonUpdateUtil
+ .updateTableMetadataStatus(segmentDetails,
+ carbonTable,
+ timestamp,
+ !isUpdateOperation,
+ listOfSegmentToBeMarkedDeleted)
+ ) {
+ LOGGER.info(s"Delete data operation is successful for " +
+ s"${ carbonTable.getDatabaseName }.${
carbonTable.getTableName }")
+ } else {
+ // In case of failure , clean all related delete delta files
+ CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+ val errorMessage = "Delete data operation is failed due to failure " +
+ "in table status updation."
+ LOGGER.error("Delete data operation is failed due to failure in table
status updation.")
+ executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
+ executorErrors.errorMsg = errorMessage
}
- (segmentsTobeDeleted, operatedRowCount)
+ segmentsTobeDeleted
+ }
+
+ // all or none : update status file, only if complete delete opeartion is
successfull.
+ def processSegments(executorErrors: ExecutionErrors,
+ res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))]],
+ carbonTable: CarbonTable,
+ timestamp: String,
+ blockMappingVO: BlockMappingVO): (util.List[SegmentUpdateDetails],
Seq[Segment]) = {
+ val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
+ val segmentDetails = new util.HashSet[Segment]()
+ res.foreach(resultOfSeg => resultOfSeg.foreach(
+ resultOfBlock => {
+ if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
+ blockUpdateDetailsList.add(resultOfBlock._2._1)
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
+ // if this block is invalid then decrement block count in map.
+ if
(CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
+ CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
+ blockMappingVO.getSegmentNumberOfBlockMapping)
+ }
+ } else {
+ // In case of failure , clean all related delete delta files
+ CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+ val errorMsg =
+ "Delete data operation is failed due to failure in creating delete
delta file for " +
+ "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
+ resultOfBlock._2._1.getBlockName
+ executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+ executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
+
+ if (executorErrors.failureCauses == FailureCauses.NONE) {
+ executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+ executorErrors.errorMsg = errorMsg
+ }
+ LOGGER.error(errorMsg)
+ return (blockUpdateDetailsList, Seq.empty[Segment])
+ }
+ }))
+
+ val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
+
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
+
+ (blockUpdateDetailsList, listOfSegmentToBeMarkedDeleted.asScala)
}
/**
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
new file mode 100644
index 0000000..3c0acc6
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.sql.{AnalysisException,
CarbonDatasourceHadoopRelation, Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericRowWithSchema}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata,
LongAccumulator}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel,
CarbonLoadModelBuilder}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+/**
+ * This command will merge the data of source dataset to target dataset backed
by carbon table.
+ * @param targetDsOri Target dataset to merge the data. This dataset should be
backed by carbontable
+ * @param srcDS Source dataset, it can be any data.
+ * @param mergeMatches It contains the join condition and list match
conditions to apply.
+ */
+case class CarbonMergeDataSetCommand(
+ targetDsOri: Dataset[Row],
+ srcDS: Dataset[Row],
+ var mergeMatches: MergeDataSetMatches)
+ extends DataCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * It merge the data of source dataset to target dataset backed by carbon
table. Basically it
+ * makes the full outer join with both source and target and apply the given
conditions as "case
+ * when" to get the status to process the row. The status could be
insert/update/delete.
+ * It also can insert the history(update/delete) data to history table.
+ *
+ */
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val rltn = collectCarbonRelation(targetDsOri.logicalPlan)
+ // Target dataset must be backed by carbondata table.
+ if (rltn.length != 1) {
+ throw new UnsupportedOperationException(
+ "Carbon table supposed to be present in merge dataset")
+ }
+ // validate the merge matches and actions.
+ validateMergeActions(mergeMatches, targetDsOri, sparkSession)
+ val carbonTable = rltn.head.carbonRelation.carbonTable
+ val hasDelAction = mergeMatches.matchList
+ .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
+ val hasUpdateAction = mergeMatches.matchList
+ .exists(_.getActions.exists(_.isInstanceOf[UpdateAction]))
+ val (insertHistOfUpdate, insertHistOfDelete) =
getInsertHistoryStatus(mergeMatches)
+ // Get all the required columns of targetDS by going through all match
conditions and actions.
+ val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches,
sparkSession)
+ // select only the required columns, it can avoid lot of and shuffling.
+ val targetDs = targetDsOri.select(columns: _*)
+ // Update the update mapping with unfilled columns.From here on system
assumes all mappings
+ // are existed.
+ mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs)
+ // Lets generate all conditions combinations as one column and add them as
'status'.
+ val condition = generateStatusColumnWithAllCombinations(mergeMatches)
+
+ // Add the tupleid udf to get the tupleid to generate delete delta.
+ val frame =
targetDs.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+ expr("getTupleId()")).withColumn("exist_on_target", lit(1)).join(
+ srcDS.withColumn("exist_on_src", lit(1)),
+ // Do the full outer join to get the data from both sides without
missing anything.
+ // TODO As per the match conditions choose the join, sometimes it might
be possible to use
+ // left_outer join.
+ mergeMatches.joinExpr, "full_outer").withColumn("status", condition)
+ if (LOGGER.isDebugEnabled) {
+ frame.explain()
+ }
+ val tableCols =
+
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).
+
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+ val builder = new CarbonLoadModelBuilder(carbonTable)
+ val options = Seq(("fileheader", tableCols.mkString(","))).toMap
+ val model = builder.build(options.asJava,
CarbonUpdateUtil.readCurrentTime, "1")
+ model.setLoadWithoutConverterStep(true)
+ val newLoadMetaEntry = new LoadMetadataDetails
+ CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
+ SegmentStatus.INSERT_IN_PROGRESS,
+ model.getFactTimeStamp,
+ false)
+ CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true,
false)
+
+ model.setCsvHeader(tableCols.mkString(","))
+
+ val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map {
m =>
+ m.getActions.map {
+ case u: UpdateAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, u)
+ case i: InsertAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, i)
+ case d: DeleteAction => MergeProjection(tableCols, frame, rltn.head,
sparkSession, d)
+ case _ => null
+ }.filter(_ != null)
+ }
+
+ val st = System.currentTimeMillis()
+ // Create accumulators to log the stats
+ val stats = Stats(createLongAccumalator("insertedRows"),
+ createLongAccumalator("updatedRows"),
+ createLongAccumalator("deletedRows"))
+ val processedRDD = processIUD(sparkSession, frame, carbonTable, model,
projections, stats)
+
+ val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
+ val trxMgr = TranxManager(model.getFactTimeStamp)
+
+ val mutationAction = MutationActionFactory.getMutationAction(sparkSession,
+ carbonTable, hasDelAction, hasUpdateAction,
+ insertHistOfUpdate, insertHistOfDelete)
+
+ val tuple = mutationAction.handleAction(processedRDD, executorErrors,
trxMgr)
+
+ // In case user only has insert action.
+ if (!(hasDelAction || hasUpdateAction)) {
+ processedRDD.count()
+ }
+ LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
+ LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
+ LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
+ LOGGER.info(
+ " Time taken to merge data : " + tuple + " :: " +
(System.currentTimeMillis() - st))
+
+ val segment = new Segment(model.getSegmentId,
+ SegmentFileStore.genSegmentFileName(
+ model.getSegmentId,
+ System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
+ model.getSegmentId), Map.empty[String, String].asJava)
+ val writeSegment =
+ SegmentFileStore.writeSegmentFile(carbonTable, segment)
+
+ if (writeSegment) {
+ SegmentFileStore.updateTableStatusFile(
+ carbonTable,
+ model.getSegmentId,
+ segment.getSegmentFileName,
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ new SegmentFileStore(carbonTable.getTablePath,
segment.getSegmentFileName),
+ SegmentStatus.SUCCESS)
+ } else {
+ CarbonLoaderUtil.updateTableStatusForFailure(model)
+ }
+
+ if (hasDelAction || hasUpdateAction) {
+ if (CarbonUpdateUtil.updateSegmentStatus(tuple._1, carbonTable,
+ trxMgr.getLatestTrx.toString, false) &&
+ CarbonUpdateUtil
+ .updateTableMetadataStatus(
+ model.getLoadMetadataDetails.asScala.map(l =>
+ new Segment(l.getMergedLoadName,
+ l.getSegmentFile)).toSet.asJava,
+ carbonTable,
+ trxMgr.getLatestTrx.toString,
+ true,
+ tuple._2.asJava)) {
+ LOGGER.info(s"Merge data operation is successful for " +
+ s"${ carbonTable.getDatabaseName }.${
carbonTable.getTableName }")
+ } else {
+ throw new CarbonMergeDataSetException("Saving update status or table
status failed")
+ }
+ }
+ // Load the history table if the inserthistorytable action is added by
user.
+ HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head,
carbonTable,
+ trxMgr, mutationAction, mergeMatches)
+ Seq.empty
+ }
+
+ /**
+ * As per the status of the row either it inserts the data or update/delete
the data.
+ */
+ private def processIUD(sparkSession: SparkSession,
+ frame: DataFrame,
+ carbonTable: CarbonTable,
+ model: CarbonLoadModel,
+ projections: Seq[Seq[MergeProjection]],
+ stats: Stats) = {
+ val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+ val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
conf)
+ val frameCols = frame.queryExecution.analyzed.output
+ val status = frameCols.length - 1
+ val tupleId = frameCols.zipWithIndex
+
.find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2
+ val insertedRows = stats.insertedRows
+ val updatedRows = stats.updatedRows
+ val deletedRows = stats.deletedRows
+
frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
+ mapPartitionsWithIndex { case (index, iter) =>
+ val confB = config.value.value
+ CarbonTableOutputFormat.setCarbonTable(confB, carbonTable)
+ model.setTaskNo(index.toString)
+ CarbonTableOutputFormat.setLoadModel(confB, model)
+ val jobId = new JobID(UUID.randomUUID.toString, 0)
+ val task = new TaskID(jobId, TaskType.MAP, index)
+ val attemptID = new TaskAttemptID(task, index)
+ val context = new TaskAttemptContextImpl(confB, attemptID)
+ val writer = new CarbonTableOutputFormat().getRecordWriter(context)
+ val writable = new ObjectArrayWritable
+ val projLen = projections.length
+ val schema =
+ org.apache.spark.sql.types.StructType(Seq(
+ StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
StringType),
+ StructField("status", IntegerType)))
+ new Iterator[Row] {
+ override def hasNext: Boolean = {
+ if (iter.hasNext) {
+ true
+ } else {
+ writer.close(context)
+ false
+ }
+ }
+
+ override def next(): Row = {
+ val row = iter.next()
+ val rowWithSchema = row.asInstanceOf[GenericRowWithSchema]
+ val is = row.get(status)
+ var isUpdate = false
+ var isDelete = false
+ var insertedCount = 0
+ if (is != null) {
+ val isInt = is.asInstanceOf[Int]
+ var i = 0;
+ while (i < projLen) {
+ if ((isInt & (1 << i)) == (1 << i)) {
+ projections(i).foreach { p =>
+ if (!p.isDelete) {
+ if (p.isUpdate) {
+ isUpdate = p.isUpdate
+ }
+ writable.set(p(rowWithSchema))
+ writer.write(NullWritable.get(), writable)
+ insertedCount += 1
+ } else {
+ isDelete = true
+ }
+ }
+ }
+ i = i + 1
+ }
+ }
+ val newArray = new Array[Any](2)
+ newArray(0) = row.getString(tupleId)
+ if (isUpdate && isDelete) {
+ newArray(1) = 102
+ updatedRows.add(1)
+ deletedRows.add(1)
+ insertedCount -= 1
+ } else if (isUpdate) {
+ updatedRows.add(1)
+ newArray(1) = 101
+ insertedCount -= 1
+ } else if (isDelete) {
+ newArray(1) = 100
+ deletedRows.add(1)
+ } else {
+ newArray(1) = is
+ }
+ insertedRows.add(insertedCount)
+ new GenericRowWithSchema(newArray, schema)
+ }
+ }
+ }.cache()
+ }
+
+ private def createLongAccumalator(name: String) = {
+ val acc = new LongAccumulator
+ acc.setValue(0)
+ acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name),
false)
+ AccumulatorContext.register(acc)
+ acc
+ }
+
+ /**
+ * It generates conditions for all possible scenarios and add a integer
number for each match.
+ * There could be scenarios that one row can match two conditions so it
should apply the actions
+ * of both the matches to the row.
+ * For example :
+ * whenmathed(a=c1)
+ * update()
+ * whennotmatched(b=d1)
+ * insert()
+ * whennotmatched(b=d2)
+ * insert()
+ *
+ * The above merge statement will be converted to
+ * (case when a=c1 and b=d1 and b=d2 then 7
+ * when a=c1 and b=d1 then 6
+ * when a=c1 and b=d2 then 5
+ * when a=c1 then 4
+ * when b=d1 and b=d2 then 3
+ * when b=d1 then 2
+ * when b=d2 the 1) as status
+ *
+ * So it would not be recommended use so many merge conditions as it
increase the case when
+ * statements exponentially.
+ *
+ * @param mergeMatches
+ * @return
+ */
+ def generateStatusColumnWithAllCombinations(mergeMatches:
MergeDataSetMatches): Column = {
+ var exprList = new ArrayBuffer[(Column, Int)]()
+ val matchList = mergeMatches.matchList
+ val len = matchList.length
+ val N = Math.pow(2d, len.toDouble).toInt
+ var i = 1
+ while (i < N) {
+ var status = 0
+ var column: Column = null
+ val code = Integer.toBinaryString(N | i).substring(1)
+ var j = 0
+ while (j < len) {
+ if (code.charAt(j) == '1') {
+ val mergeMatch = matchList(j)
+ if (column == null) {
+ if (mergeMatch.getExp.isDefined) {
+ column = mergeMatch.getExp.get
+ }
+ } else {
+ if (mergeMatch.getExp.isDefined) {
+ column = column.and(mergeMatch.getExp.get)
+ }
+ }
+ mergeMatch match {
+ case wm: WhenMatched =>
+ val existsOnBoth = col("exist_on_target").isNotNull.and(
+ col("exist_on_src").isNotNull)
+ column = if (column == null) {
+ existsOnBoth
+ } else {
+ column.and(existsOnBoth)
+ }
+ case wnm: WhenNotMatched =>
+ val existsOnSrc = col("exist_on_target").isNull.and(
+ col("exist_on_src").isNotNull)
+ column = if (column == null) {
+ existsOnSrc
+ } else {
+ column.and(existsOnSrc)
+ }
+ case wnm: WhenNotMatchedAndExistsOnlyOnTarget =>
+ val existsOnSrc = col("exist_on_target").isNotNull.and(
+ col("exist_on_src").isNull)
+ column = if (column == null) {
+ existsOnSrc
+ } else {
+ column.and(existsOnSrc)
+ }
+ case _ =>
+ }
+ status = status | 1 << j
+ }
+ j += 1
+ }
+ if (column == null) {
+ column = lit(true) === lit(true)
+ }
+ exprList += ((column, status))
+ i += 1
+ }
+ exprList = exprList.reverse
+ var condition: Column = null
+ exprList.foreach { case (col, status) =>
+ if (condition == null) {
+ condition = when(col, lit(status))
+ } else {
+ condition = condition.when(col, lit(status))
+ }
+ }
+ condition.otherwise(lit(null))
+ }
+
+ private def getSelectExpressionsOnExistingDF(existingDs: Dataset[Row],
+ mergeMatches: MergeDataSetMatches, sparkSession: SparkSession):
Seq[Column] = {
+ var projects = Seq.empty[Attribute]
+ val existAttrs = existingDs.queryExecution.analyzed.output
+ projects ++= selectAttributes(mergeMatches.joinExpr.expr, existingDs,
sparkSession)
+ mergeMatches.matchList.foreach { m =>
+ if (m.getExp.isDefined) {
+ projects ++= selectAttributes(m.getExp.get.expr, existingDs,
sparkSession)
+ }
+ m.getActions.foreach {
+ case u: UpdateAction =>
+ projects ++= existAttrs.filterNot { f =>
+ u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name))
+ }
+ case i: InsertAction =>
+ if (!existAttrs.forall(f => i.insertMap
+ .exists(_._1.toString().equalsIgnoreCase(f.name)))) {
+ throw new CarbonMergeDataSetException(
+ "Not all source columns are mapped for insert action " +
i.insertMap)
+ }
+ i.insertMap.foreach { case (k, v) =>
+ projects ++= selectAttributes(v.expr, existingDs, sparkSession)
+ }
+ case _ =>
+ }
+ }
+ projects.map(_.name.toLowerCase).distinct.map { p =>
+ existingDs.col(p)
+ }
+ }
+
+ private def updateMappingIfNotExists(mergeMatches: MergeDataSetMatches,
+ existingDs: Dataset[Row]): MergeDataSetMatches = {
+ val existAttrs = existingDs.queryExecution.analyzed.output
+ val updateCommand = mergeMatches.matchList.map { m =>
+ val updateAction = m.getActions.map {
+ case u: UpdateAction =>
+ if (u.updateMap.isEmpty) {
+ throw new CarbonMergeDataSetException(
+ "At least one column supposed to be updated for update action")
+ }
+ val attributes = existAttrs.filterNot { f =>
+ u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name))
+ }
+ val newMap = attributes.map(a => (existingDs.col(a.name),
existingDs.col(a.name))).toMap
+ u.copy(u.updateMap ++ newMap)
+ case other => other
+ }
+ m.updateActions(updateAction)
+ }
+ mergeMatches.copy(matchList =
+
updateCommand.filterNot(_.getActions.exists(_.isInstanceOf[DeleteAction]))
+ ++
updateCommand.filter(_.getActions.exists(_.isInstanceOf[DeleteAction])))
+ }
+
+ private def selectAttributes(expression: Expression, existingDs:
Dataset[Row],
+ sparkSession: SparkSession, throwError: Boolean = false) = {
+ expression.collect {
+ case a: Attribute =>
+ val resolved = existingDs.queryExecution
+ .analyzed.resolveQuoted(a.name,
sparkSession.sessionState.analyzer.resolver)
+ if (resolved.isDefined) {
+ resolved.get.toAttribute
+ } else if (throwError) {
+ throw new CarbonMergeDataSetException(
+ expression + " cannot be resolved with dataset " + existingDs)
+ } else {
+ null
+ }
+ }.filter(_ != null)
+ }
+
+ private def collectCarbonRelation(plan: LogicalPlan):
Seq[CarbonDatasourceHadoopRelation] = {
+ plan collect {
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ }
+ }
+
+ private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = {
+ val insertHistOfUpdate = mergeMatches.matchList.exists(p =>
+ p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+ && p.getActions.exists(_.isInstanceOf[UpdateAction]))
+ val insertHistOfDelete = mergeMatches.matchList.exists(p =>
+ p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+ && p.getActions.exists(_.isInstanceOf[DeleteAction]))
+ (insertHistOfUpdate, insertHistOfDelete)
+ }
+
+ private def validateMergeActions(mergeMatches: MergeDataSetMatches,
+ existingDs: Dataset[Row], sparkSession: SparkSession): Unit = {
+ val existAttrs = existingDs.queryExecution.analyzed.output
+ if (mergeMatches.matchList.exists(m =>
m.getActions.exists(_.isInstanceOf[DeleteAction])
+ &&
m.getActions.exists(_.isInstanceOf[UpdateAction]))) {
+ throw new AnalysisException(
+ "Delete and update action should not be under same merge condition")
+ }
+ if (mergeMatches.matchList.count(m =>
m.getActions.exists(_.isInstanceOf[DeleteAction])) > 1) {
+ throw new AnalysisException("Delete action should not be more than once
across merge")
+ }
+ mergeMatches.matchList.foreach { f =>
+ if (f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])) {
+ if (!(f.getActions.exists(_.isInstanceOf[UpdateAction]) ||
+ f.getActions.exists(_.isInstanceOf[DeleteAction]))) {
+ throw new AnalysisException("For inserting to history table, " +
+ "it should be along with either update or delete
action")
+ }
+ val value =
f.getActions.find(_.isInstanceOf[InsertInHistoryTableAction]).get.
+ asInstanceOf[InsertInHistoryTableAction]
+ if (!existAttrs.forall(f => value.insertMap
+ .exists(_._1.toString().equalsIgnoreCase(f.name)))) {
+ throw new AnalysisException(
+ "Not all source columns are mapped for insert action " +
value.insertMap)
+ }
+ value.insertMap.foreach { case (k, v) =>
+ selectAttributes(v.expr, existingDs, sparkSession, true)
+ }
+ }
+ }
+ }
+
+ override protected def opName: String = "MERGE DATASET"
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
new file mode 100644
index 0000000..be59b9a
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+/**
+ * Exception during merge operation.
+ */
+class CarbonMergeDataSetException(msg: String, exception: Throwable)
+ extends Exception(msg, exception) {
+
+ def this(exception: Throwable) {
+ this("", exception)
+ }
+
+ def this(msg: String) {
+ this(msg, null)
+ }
+
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
new file mode 100644
index 0000000..6ef7c37
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row,
SparkSession}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.DataTypeConverterImpl
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.spark.rdd.CarbonDeltaRowScanRDD
+import org.apache.carbondata.spark.readsupport.SparkGenericRowReadSupportImpl
+
+object HistoryTableLoadHelper {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * Load to history table by reading the data from target table using the
last transactions. So
+ * here we read the deleted data from target table by using delta and load
them to history table.
+ */
+ def loadHistoryTable(sparkSession: SparkSession,
+ rltn: CarbonDatasourceHadoopRelation,
+ carbonTable: CarbonTable,
+ trxMgr: TranxManager,
+ mutationAction: MutationAction,
+ mergeMatches: MergeDataSetMatches): Unit = {
+ if (!mutationAction.isInstanceOf[HandleUpdateAndDeleteAction]) {
+ val insert = mergeMatches
+ .matchList
+ .filter { f =>
+ f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
+ }
+ .head
+ .getActions
+ .find(_.isInstanceOf[InsertInHistoryTableAction])
+ .get
+ .asInstanceOf[InsertInHistoryTableAction]
+ // Get the history table dataframe.
+ val histDataFrame: Dataset[Row] = sparkSession.table(insert.historyTable)
+ // check if the user wants to insert update history records into history
table.
+ val updateDataFrame = if (trxMgr.getUpdateTrx != -1) {
+ // Get the insertHistoryAction related to update action.
+ val insertHist = mergeMatches.matchList.filter { f =>
+ f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) &&
+ f.getActions.exists(_.isInstanceOf[UpdateAction])
+
}.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head.
+ asInstanceOf[InsertInHistoryTableAction]
+ // Create the dataframe to fetch history updated records.
+ Some(createHistoryDataFrame(sparkSession, rltn, carbonTable,
insertHist,
+ histDataFrame, trxMgr.getUpdateTrx))
+ } else {
+ None
+ }
+ // check if the user wants to insert delete history records into history
table.
+ val delDataFrame = if (trxMgr.getDeleteTrx != -1) {
+ val insertHist = mergeMatches.matchList.filter { f =>
+ f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) &&
+ f.getActions.exists(_.isInstanceOf[DeleteAction])
+
}.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head.
+ asInstanceOf[InsertInHistoryTableAction]
+ Some(createHistoryDataFrame(sparkSession, rltn, carbonTable,
insertHist,
+ histDataFrame: Dataset[Row], trxMgr.getDeleteTrx))
+ } else {
+ None
+ }
+
+ val unionDf = (updateDataFrame, delDataFrame) match {
+ case (Some(u), Some(d)) => u.union(d)
+ case (Some(u), None) => u
+ case (None, Some(d)) => d
+ case _ => throw new CarbonMergeDataSetException("Some thing is wrong")
+ }
+
+ val alias = carbonTable.getTableName + System.currentTimeMillis()
+ unionDf.createOrReplaceTempView(alias)
+ val start = System.currentTimeMillis()
+ sparkSession.sql(s"insert into ${ insert.historyTable.quotedString } " +
+ s"select * from ${ alias }")
+ LOGGER.info("Time taken to insert into history table " +
(System.currentTimeMillis() - start))
+ }
+ }
+
+ /**
+ * It creates the dataframe to fetch deleted/updated records in the
particular transaction.
+ */
+ private def createHistoryDataFrame(sparkSession: SparkSession,
+ rltn: CarbonDatasourceHadoopRelation,
+ carbonTable: CarbonTable,
+ insertHist: InsertInHistoryTableAction,
+ histDataFrame: Dataset[Row],
+ factTimestamp: Long) = {
+ val rdd1 = new CarbonDeltaRowScanRDD[Row](sparkSession,
+ carbonTable.getTableInfo.serialize(),
+ carbonTable.getTableInfo,
+ null,
+ new CarbonProjection(
+ carbonTable.getCreateOrderColumn().asScala.map(_.getColName).toArray),
+ null,
+ carbonTable.getAbsoluteTableIdentifier,
+ new CarbonInputMetrics,
+ classOf[DataTypeConverterImpl],
+ classOf[SparkGenericRowReadSupportImpl],
+ factTimestamp.toString)
+
+ val frame1 = sparkSession.createDataFrame(rdd1, rltn.carbonRelation.schema)
+ val histOutput = histDataFrame.queryExecution.analyzed.output
+ val cols = histOutput.map { a =>
+ insertHist.insertMap.find(p => p._1.toString().equalsIgnoreCase(a.name))
match {
+ case Some((k, v)) => v
+ case _ =>
+ throw new CarbonMergeDataSetException(
+ " All columns of history table are mapped in " + insertHist)
+ }
+ }
+ frame1.select(cols: _*)
+ }
+
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
new file mode 100644
index 0000000..2525dcd
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, Column, Dataset, Row,
SparkSession}
+import org.apache.spark.sql.functions.expr
+
+/**
+ * Builder class to generate and execute merge
+ */
+class MergeDataSetBuilder(existingDsOri: Dataset[Row], currDs: Dataset[Row],
+ joinExpr: Column, sparkSession: SparkSession) {
+
+ def this(existingDsOri: Dataset[Row], currDs: Dataset[Row],
+ joinExpr: String, sparkSession: SparkSession) {
+ this(existingDsOri, currDs, expr(joinExpr), sparkSession)
+ }
+
+ val matchList: util.List[MergeMatch] = new util.ArrayList[MergeMatch]()
+
+ def whenMatched(): MergeDataSetBuilder = {
+ matchList.add(WhenMatched())
+ this
+ }
+
+ def whenMatched(expression: String): MergeDataSetBuilder = {
+ matchList.add(WhenMatched(Some(expr(expression))))
+ this
+ }
+
+ def whenMatched(expression: Column): MergeDataSetBuilder = {
+ matchList.add(WhenMatched(Some(expression)))
+ this
+ }
+
+ def whenNotMatched(): MergeDataSetBuilder = {
+ matchList.add(WhenNotMatched())
+ this
+ }
+
+ def whenNotMatched(expression: String): MergeDataSetBuilder = {
+ matchList.add(WhenNotMatched(Some(expr(expression))))
+ this
+ }
+
+ def whenNotMatched(expression: Column): MergeDataSetBuilder = {
+ matchList.add(WhenNotMatched(Some(expression)))
+ this
+ }
+
+ def whenNotMatchedAndExistsOnlyOnTarget(): MergeDataSetBuilder = {
+ matchList.add(WhenNotMatchedAndExistsOnlyOnTarget())
+ this
+ }
+
+ def whenNotMatchedAndExistsOnlyOnTarget(expression: String):
MergeDataSetBuilder = {
+ matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expr(expression))))
+ this
+ }
+
+ def whenNotMatchedAndExistsOnlyOnTarget(expression: Column):
MergeDataSetBuilder = {
+ matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expression)))
+ this
+ }
+
+ def updateExpr(expression: Map[Any, Any]): MergeDataSetBuilder = {
+ checkBuilder
+ matchList.get(matchList.size() -
1).addAction(UpdateAction(convertMap(expression)))
+ this
+ }
+
+ def insertExpr(expression: Map[Any, Any]): MergeDataSetBuilder = {
+ checkBuilder
+ matchList.get(matchList.size() -
1).addAction(InsertAction(convertMap(expression)))
+ this
+ }
+
+ def delete(): MergeDataSetBuilder = {
+ checkBuilder
+ matchList.get(matchList.size() - 1).addAction(DeleteAction())
+ this
+ }
+
+ def build(): CarbonMergeDataSetCommand = {
+ checkBuilder
+ CarbonMergeDataSetCommand(existingDsOri, currDs,
+ MergeDataSetMatches(joinExpr, matchList.asScala.toList))
+ }
+
+ def execute(): Unit = {
+ build().run(sparkSession)
+ }
+
+ private def convertMap(exprMap: Map[Any, Any]): Map[Column, Column] = {
+ if (exprMap.exists{ case (k, v) =>
+ !(checkType(k) && checkType(v))
+ }) {
+ throw new AnalysisException(
+ "Expression map should only contain either String or Column " +
exprMap)
+ }
+ def checkType(obj: Any) = obj.isInstanceOf[String] ||
obj.isInstanceOf[Column]
+ def convert(obj: Any) =
+ if (obj.isInstanceOf[Column]) obj.asInstanceOf[Column] else
expr(obj.toString)
+ exprMap.map{ case (k, v) =>
+ (convert(k), convert(v))
+ }
+ }
+
+ private def checkBuilder(): Unit = {
+ if (matchList.size() == 0) {
+ throw new AnalysisException("Atleast one matcher should be called before
calling an action")
+ }
+ }
+
+}
+
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
new file mode 100644
index 0000000..1245bd4
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row,
SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
GenericInternalRow, GenericRowWithSchema, InterpretedMutableProjection,
Projection}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DateType, TimestampType}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Creates the projection for each action like update,delete or insert.
+ */
+case class MergeProjection(
+ @transient tableCols: Seq[String],
+ @transient ds: Dataset[Row],
+ @transient rltn: CarbonDatasourceHadoopRelation,
+ @transient sparkSession: SparkSession,
+ @transient mergeAction: MergeAction) {
+
+ private val cutOffDate = Integer.MAX_VALUE >> 1
+
+ val isUpdate = mergeAction.isInstanceOf[UpdateAction]
+ val isDelete = mergeAction.isInstanceOf[DeleteAction]
+
+ def apply(row: GenericRowWithSchema): Array[Object] = {
+ // TODO we can avoid these multiple conversions if this is added as a
SparkPlan node.
+ val values = row.toSeq.map {
+ case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+ case d: java.math.BigDecimal =>
org.apache.spark.sql.types.Decimal.apply(d)
+ case b: Array[Byte] =>
org.apache.spark.unsafe.types.UTF8String.fromBytes(b)
+ case d: Date => DateTimeUtils.fromJavaDate(d)
+ case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+ case value => value
+ }
+
+ val outputRow = projection(new GenericInternalRow(values.toArray))
+ .asInstanceOf[GenericInternalRow]
+
+ val array = outputRow.values.clone()
+ var i = 0
+ while (i < array.length) {
+ output(i).dataType match {
+ case d: DateType =>
+ if (array(i) == null) {
+ array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+ } else {
+ array(i) = (array(i).asInstanceOf[Int] + cutOffDate)
+ }
+ case d: TimestampType =>
+ if (array(i) == null) {
+ array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+ } else {
+ array(i) = (array(i).asInstanceOf[Long] / 1000)
+ }
+
+ case _ =>
+ }
+ i += 1
+ }
+ array.asInstanceOf[Array[Object]]
+ }
+
+ val (projection, output) = generateProjection
+
+ private def generateProjection: (Projection, Array[Expression]) = {
+ val existingDsOutput = rltn.carbonRelation.schema.toAttributes
+ val colsMap = mergeAction match {
+ case UpdateAction(updateMap) => updateMap
+ case InsertAction(insertMap) => insertMap
+ case _ => null
+ }
+ if (colsMap != null) {
+ val output = new Array[Expression](tableCols.length)
+ val expecOutput = new Array[Expression](tableCols.length)
+ colsMap.foreach { case (k, v) =>
+ val tableIndex = tableCols.indexOf(k.toString().toLowerCase)
+ if (tableIndex < 0) {
+ throw new CarbonMergeDataSetException(s"Mapping is wrong $colsMap")
+ }
+ output(tableIndex) = v.expr.transform {
+ case a: Attribute if !a.resolved =>
+ ds.queryExecution.analyzed.resolveQuoted(a.name,
+ sparkSession.sessionState.analyzer.resolver).get
+ }
+ expecOutput(tableIndex) =
+
existingDsOutput.find(_.name.equalsIgnoreCase(tableCols(tableIndex))).get
+ }
+ if (output.contains(null)) {
+ throw new CarbonMergeDataSetException(s"Not all columns are mapped")
+ }
+ (new InterpretedMutableProjection(output,
ds.queryExecution.analyzed.output), expecOutput)
+ } else {
+ (null, null)
+ }
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
new file mode 100644
index 0000000..7410686
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.command.mutation.DeleteExecution
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil,
SegmentUpdateDetails}
+import org.apache.carbondata.processing.loading.FailureCauses
+
+/**
+ * It apply the mutations like update and delete delta on to the store.
+ */
+abstract class MutationAction(sparkSession: SparkSession, carbonTable:
CarbonTable) {
+
+ /**
+ * The RDD of tupleids and delta status will be processed here to write the
delta on store
+ */
+ def handleAction(dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment])
+
+ protected def handle(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ factTimestamp: Long,
+ dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ condition: (Int) => Boolean): (util.List[SegmentUpdateDetails],
Seq[Segment]) = {
+ val update = dataRDD.filter { row =>
+ val status = row.get(1)
+ status != null && condition(status.asInstanceOf[Int])
+ }
+ val tuple1 =
DeleteExecution.deleteDeltaExecutionInternal(Some(carbonTable.getDatabaseName),
+ carbonTable.getTableName,
+ sparkSession, update,
+ factTimestamp.toString,
+ true, executorErrors, Some(0))
+ MutationActionFactory.checkErrors(executorErrors)
+ val tupleProcessed1 = DeleteExecution.processSegments(executorErrors,
tuple1._1, carbonTable,
+ factTimestamp.toString, tuple1._2)
+ MutationActionFactory.checkErrors(executorErrors)
+ tupleProcessed1
+ }
+
+}
+
+/**
+ * It apply the update delta records to store in one transaction
+ */
+case class HandleUpdateAction(sparkSession: SparkSession, carbonTable:
CarbonTable)
+ extends MutationAction(sparkSession, carbonTable) {
+
+ override def handleAction(dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) =
{
+ handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+ dataRDD, executorErrors, (status) => (status == 101) || (status == 102))
+ }
+}
+
+/**
+ * It apply the delete delta records to store in one transaction
+ */
+case class HandleDeleteAction(sparkSession: SparkSession, carbonTable:
CarbonTable)
+ extends MutationAction(sparkSession, carbonTable) {
+
+ override def handleAction(dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) =
{
+ handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+ dataRDD, executorErrors, (status) => (status == 100) || (status == 102))
+ }
+}
+
+/**
+ * It apply the multiple mutations of delta records to store in multiple
transactions.
+ */
+case class MultipleMutationAction(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ mutations: Seq[MutationAction])
+ extends MutationAction(sparkSession, carbonTable) {
+
+ override def handleAction(dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) =
{
+ var (updates: util.List[SegmentUpdateDetails], segs: Seq[Segment]) =
+ (new util.ArrayList[SegmentUpdateDetails], Seq.empty[Segment])
+ mutations.foreach { m =>
+ val (l, r) = m.handleAction(dataRDD, executorErrors, trxMgr)
+ l.asScala.foreach { entry =>
+ CarbonUpdateUtil.mergeSegmentUpdate(false, updates, entry)
+ }
+ segs ++= r
+ }
+ (updates, segs.distinct)
+ }
+}
+
+/**
+ * It apply the delete and update delta records to store in a single
transaction
+ */
+case class HandleUpdateAndDeleteAction(sparkSession: SparkSession,
carbonTable: CarbonTable)
+ extends MutationAction(sparkSession, carbonTable) {
+
+ override def handleAction(dataRDD: RDD[Row],
+ executorErrors: ExecutionErrors,
+ trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) =
{
+ handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this),
+ dataRDD, executorErrors, (status) => (status == 100) || (status == 101)
|| (status == 102))
+ }
+}
+
+object MutationActionFactory {
+
+ /**
+ * It is a factory method to generate a respective mutation action for
update and delete.
+ */
+ def getMutationAction(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ hasDelAction: Boolean,
+ hasUpAction: Boolean,
+ hasInsrtHistUpd: Boolean,
+ hasInsrtHistDel: Boolean): MutationAction = {
+ var actions = Seq.empty[MutationAction]
+ // If the merge has history insert action then write the delete delta in
two separate actions.
+ // As it is needed to know which are deleted records and which are insert
records.
+ if (hasInsrtHistDel || hasInsrtHistUpd) {
+ if (hasUpAction) {
+ actions ++= Seq(HandleUpdateAction(sparkSession, carbonTable))
+ }
+ if (hasDelAction) {
+ actions ++= Seq(HandleDeleteAction(sparkSession, carbonTable))
+ }
+ } else {
+ // If there is no history insert action then apply it in single flow.
+ actions ++= Seq(HandleUpdateAndDeleteAction(sparkSession, carbonTable))
+ }
+ if (actions.length == 1) {
+ actions.head
+ } else {
+ // If it has multiple actions to apply then combine to multi action.
+ MultipleMutationAction(sparkSession, carbonTable, actions)
+ }
+ }
+
+ def checkErrors(executorErrors: ExecutionErrors): Unit = {
+ // Check for any failures occured during delete delta execution
+ if (executorErrors.failureCauses != FailureCauses.NONE) {
+ throw new CarbonMergeDataSetException(executorErrors.errorMsg)
+ }
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
new file mode 100644
index 0000000..8b5e4e8
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+/**
+ * It manages the transaction number for update or delete operations. Since we
are applying update
+ * and delete delta records in a separate transactions it is required to keep
track of transaction
+ * numbers.
+ */
+case class TranxManager(factTimestamp: Long) {
+
+ private var newFactTimestamp: Long = factTimestamp
+ private var mutationMap = Map.empty[MutationAction, Long]
+
+ def getNextTransaction(mutationAction: MutationAction): Long = {
+ if (mutationMap.isEmpty) {
+ mutationMap ++= Map[MutationAction, Long]((mutationAction,
newFactTimestamp))
+ } else {
+ if (mutationMap.get(mutationAction).isDefined) {
+ return mutationMap(mutationAction)
+ } else {
+ newFactTimestamp = newFactTimestamp + 1
+ mutationMap ++= Map[MutationAction, Long]((mutationAction,
newFactTimestamp))
+ }
+ }
+ newFactTimestamp
+ }
+
+ def getLatestTrx: Long = newFactTimestamp
+
+ def getUpdateTrx: Long = {
+ val map = mutationMap.filter(_._1.isInstanceOf[HandleUpdateAction])
+ if (map.isEmpty) {
+ -1
+ } else {
+ map.head._2
+ }
+ }
+
+ def getDeleteTrx: Long = {
+ val map = mutationMap.filter(_._1.isInstanceOf[HandleDeleteAction])
+ if (map.isEmpty) {
+ -1
+ } else {
+ map.head._2
+ }
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
new file mode 100644
index 0000000..91f0322
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.mutation.merge
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.util.LongAccumulator
+
+/**
+ * It describes the type of match like whenmatched or whennotmatched etc., it
holds all the actions
+ * to be done when this match passes.
+ */
+abstract class MergeMatch extends Serializable {
+
+ var list: ArrayBuffer[MergeAction] = new ArrayBuffer[MergeAction]()
+
+ def getExp: Option[Column]
+
+ def addAction(action: MergeAction): MergeMatch = {
+ list += action
+ this
+ }
+
+ def getActions: List[MergeAction] = {
+ list.toList
+ }
+
+ def updateActions(actions: List[MergeAction]): MergeMatch = {
+ list = new ArrayBuffer[MergeAction]()
+ list ++= actions
+ this
+ }
+}
+
+/**
+ * It describes the type of action like update,delete or insert
+ */
+trait MergeAction extends Serializable
+
+/**
+ * It is the holder to keep all the matches and join condition.
+ */
+case class MergeDataSetMatches(joinExpr: Column, matchList: List[MergeMatch])
extends Serializable
+
+case class WhenMatched(expression: Option[Column] = None) extends MergeMatch {
+ override def getExp: Option[Column] = expression
+}
+
+case class WhenNotMatched(expression: Option[Column] = None) extends
MergeMatch {
+ override def getExp: Option[Column] = expression
+}
+
+case class WhenNotMatchedAndExistsOnlyOnTarget(expression: Option[Column] =
None)
+ extends MergeMatch {
+ override def getExp: Option[Column] = expression
+}
+
+case class UpdateAction(updateMap: Map[Column, Column]) extends MergeAction
+
+case class InsertAction(insertMap: Map[Column, Column]) extends MergeAction
+
+/**
+ * It inserts the history data into history table
+ */
+case class InsertInHistoryTableAction(insertMap: Map[Column, Column],
historyTable: TableIdentifier)
+ extends MergeAction
+
+case class DeleteAction() extends MergeAction
+
+case class Stats(insertedRows: LongAccumulator,
+ updatedRows: LongAccumulator,
+ deletedRows: LongAccumulator)