This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 5b8ac84 [CARBONDATA-3597] Support Merge for SCD and CCD scenarios 5b8ac84 is described below commit 5b8ac8478847ef6962f39362f8e16f30a9cb6a03 Author: ravipesala <ravi.pes...@gmail.com> AuthorDate: Mon Dec 30 11:38:36 2019 +0800 [CARBONDATA-3597] Support Merge for SCD and CCD scenarios Added dataframe API to merge the datasets online and applies the actions as per the conditions. The supported DataSet API as follows. targetDS.merge(sourceDS, <condition>). whenMatched(<condition>). updateExpr(updateMap). insertExpr(insertMap_u). whenNotMatched(<condition>). insertExpr(insertMap). whenNotMatchedAndExistsOnlyOnTarget(<condition>). delete(). insertHistoryTableExpr(insertMap_d, <table_name>). execute() This closes #3483 --- .../carbondata/core/mutate/CarbonUpdateUtil.java | 53 +- .../impl/DictionaryBasedResultCollector.java | 19 +- .../RestructureBasedDictionaryResultCollector.java | 9 +- .../collector/impl/RowIdBasedResultCollector.java | 9 +- .../scan/executor/impl/AbstractQueryExecutor.java | 1 + .../scan/executor/infos/BlockExecutionInfo.java | 14 + .../carbondata/core/scan/model/QueryModel.java | 14 + .../core/scan/result/BlockletScannedResult.java | 1 - .../scan/result/impl/FilterQueryScannedResult.java | 2 - .../result/impl/NonFilterQueryScannedResult.java | 1 - .../statusmanager/SegmentUpdateStatusManager.java | 74 ++- dev/javastyle-config.xml | 3 - .../carbondata/hadoop/api/CarbonInputFormat.java | 8 +- .../hadoop/api/CarbonTableInputFormat.java | 13 +- .../spark/testsuite/merge/MergeTestCase.scala | 501 +++++++++++++++++++ .../spark/rdd/CarbonDeltaRowScanRDD.scala | 90 ++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +- .../apache/spark/sql/hive/DistributionUtil.scala | 2 +- .../SparkGenericRowReadSupportImpl.java | 59 +++ .../indexserver/DistributedRDDUtils.scala | 4 +- .../scala/org/apache/spark/sql/CarbonSession.scala | 11 + .../command/mutation/DeleteExecution.scala | 260 ++++++---- .../mutation/merge/CarbonMergeDataSetCommand.scala | 531 +++++++++++++++++++++ .../merge/CarbonMergeDataSetException.scala | 33 ++ .../mutation/merge/HistoryTableLoadHelper.scala | 136 ++++++ .../mutation/merge/MergeDataSetBuilder.scala | 134 ++++++ .../command/mutation/merge/MergeProjection.scala | 114 +++++ .../command/mutation/merge/MutationAction.scala | 174 +++++++ .../command/mutation/merge/TranxManager.scala | 62 +++ .../command/mutation/merge/interfaces.scala | 88 ++++ 30 files changed, 2259 insertions(+), 163 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index 2b3096e..c9b4360 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -134,30 +134,7 @@ public class CarbonUpdateUtil { List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails)); for (SegmentUpdateDetails newBlockEntry : updateDetailsList) { - int index = oldList.indexOf(newBlockEntry); - if (index != -1) { - // update the element in existing list. - SegmentUpdateDetails blockDetail = oldList.get(index); - if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() || (isCompaction)) { - blockDetail - .setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp()); - } - blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp()); - blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus()); - blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock()); - // If the start and end time is different then the delta is there in multiple files so - // add them to the list to get the delta files easily with out listing. - if (!blockDetail.getDeleteDeltaStartTimestamp() - .equals(blockDetail.getDeleteDeltaEndTimestamp())) { - blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp()); - blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp()); - } else { - blockDetail.setDeltaFileStamps(null); - } - } else { - // add the new details to the list. - oldList.add(newBlockEntry); - } + mergeSegmentUpdate(isCompaction, oldList, newBlockEntry); } segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, updateStatusFileIdentifier); @@ -180,6 +157,34 @@ public class CarbonUpdateUtil { return status; } + public static void mergeSegmentUpdate(boolean isCompaction, List<SegmentUpdateDetails> oldList, + SegmentUpdateDetails newBlockEntry) { + int index = oldList.indexOf(newBlockEntry); + if (index != -1) { + // update the element in existing list. + SegmentUpdateDetails blockDetail = oldList.get(index); + if (blockDetail.getDeleteDeltaStartTimestamp().isEmpty() || isCompaction) { + blockDetail + .setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp()); + } + blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp()); + blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus()); + blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock()); + // If the start and end time is different then the delta is there in multiple files so + // add them to the list to get the delta files easily with out listing. + if (!blockDetail.getDeleteDeltaStartTimestamp() + .equals(blockDetail.getDeleteDeltaEndTimestamp())) { + blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp()); + blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp()); + } else { + blockDetail.setDeltaFileStamps(null); + } + } else { + // add the new details to the list. + oldList.add(newBlockEntry); + } + } + /** * Update table status * @param updatedSegmentsList diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java index d011da3..554d11a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java @@ -98,6 +98,8 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect private Map<Integer, Map<CarbonDimension, ByteBuffer>> mergedComplexDimensionDataMap = new HashMap<>(); + private boolean readOnlyDelta; + public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) { super(blockExecutionInfos); queryDimensions = executionInfo.getProjectionDimensions(); @@ -105,7 +107,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect initDimensionAndMeasureIndexesForFillingData(); isDimensionExists = queryDimensions.length > 0; this.comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap(); - + this.readOnlyDelta = executionInfo.isReadOnlyDelta(); } /** @@ -136,6 +138,16 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect } } while (scannedResult.hasNext() && rowCounter < batchSize) { + scannedResult.incrementCounter(); + if (readOnlyDelta) { + if (!scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { + continue; + } + } else { + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { + continue; + } + } Object[] row = new Object[queryDimensions.length + queryMeasures.length]; if (isDimensionExists) { surrogateResult = scannedResult.getDictionaryKeyIntegerArray(); @@ -151,11 +163,6 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray, comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal()); } - } else { - scannedResult.incrementCounter(); - } - if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { - continue; } fillMeasureData(scannedResult, row); if (isStructQueryType) { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java index 3627e00..522aaf1 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java @@ -74,6 +74,10 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe Map<Integer, GenericQueryType> comlexDimensionInfoMap = executionInfo.getComlexDimensionInfoMap(); while (scannedResult.hasNext() && rowCounter < batchSize) { + scannedResult.incrementCounter(); + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { + continue; + } Object[] row = new Object[queryDimensions.length + queryMeasures.length]; if (isDimensionExists) { surrogateResult = scannedResult.getDictionaryKeyIntegerArray(); @@ -101,11 +105,6 @@ public class RestructureBasedDictionaryResultCollector extends DictionaryBasedRe comlexDimensionInfoMap, row, i, executionInfo .getProjectionDimensions()[segmentDimensionsIdx++].getDimension().getOrdinal()); } - } else { - scannedResult.incrementCounter(); - } - if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { - continue; } fillMeasureData(scannedResult, row); listBasedResult.add(row); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java index 2111b02..7a0732b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RowIdBasedResultCollector.java @@ -45,6 +45,10 @@ public class RowIdBasedResultCollector extends DictionaryBasedResultCollector { byte[][] complexTypeKeyArray; int columnCount = queryDimensions.length + queryMeasures.length; while (scannedResult.hasNext() && rowCounter < batchSize) { + scannedResult.incrementCounter(); + if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { + continue; + } Object[] row = new Object[columnCount + 3]; row[columnCount] = scannedResult.getBlockletNumber(); row[columnCount + 1] = scannedResult.getCurrentPageCounter(); @@ -59,13 +63,8 @@ public class RowIdBasedResultCollector extends DictionaryBasedResultCollector { fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray, comlexDimensionInfoMap, row, i, queryDimensions[i].getDimension().getOrdinal()); } - } else { - scannedResult.incrementCounter(); } row[columnCount + 2] = scannedResult.getCurrentRowId(); - if (scannedResult.containsDeletedRow(scannedResult.getCurrentRowId())) { - continue; - } fillMeasureData(scannedResult, row); listBasedResult.add(row); rowCounter++; diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index c891ba2..ab21819 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -487,6 +487,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { blockExecutionInfo .setTotalNumberDimensionToRead( segmentProperties.getDimensionOrdinalToChunkMapping().size()); + blockExecutionInfo.setReadOnlyDelta(queryModel.isReadOnlyDelta()); if (queryModel.isReadPageByPage()) { blockExecutionInfo.setPrefetchBlocklet(false); LOGGER.info("Query prefetch is: false, read page by page"); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java index 0bd053c..51368dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java @@ -228,6 +228,12 @@ public class BlockExecutionInfo { private ReusableDataBuffer[] measureResusableDataBuffer; /** + * It is used to read only the deleted data of a particular version. It will be used to get the + * old updated/deleted data before update. + */ + private boolean readOnlyDelta; + + /** * @param blockIndex the tableBlock to set */ public void setDataBlock(AbstractIndex blockIndex) { @@ -659,4 +665,12 @@ public class BlockExecutionInfo { public void setMeasureResusableDataBuffer(ReusableDataBuffer[] measureResusableDataBuffer) { this.measureResusableDataBuffer = measureResusableDataBuffer; } + + public boolean isReadOnlyDelta() { + return readOnlyDelta; + } + + public void setReadOnlyDelta(boolean readOnlyDelta) { + this.readOnlyDelta = readOnlyDelta; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index d604e15..3f300e7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -118,6 +118,12 @@ public class QueryModel { */ private boolean isDirectVectorFill; + /** + * It is used to read only the deleted data of a particular version. It will be used to get the + * old updated/deleted data before update. + */ + private boolean readOnlyDelta; + private QueryModel(CarbonTable carbonTable) { tableBlockInfos = new ArrayList<TableBlockInfo>(); this.table = carbonTable; @@ -396,6 +402,14 @@ public class QueryModel { isDirectVectorFill = directVectorFill; } + public boolean isReadOnlyDelta() { + return readOnlyDelta; + } + + public void setReadOnlyDelta(boolean readOnlyDelta) { + this.readOnlyDelta = readOnlyDelta; + } + @Override public String toString() { return String.format("scan on table %s.%s, %d projection columns with filter (%s)", diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java index a4c1c6c..6c0ab4d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java @@ -247,7 +247,6 @@ public abstract class BlockletScannedResult { column = dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter] .fillSurrogateKey(rowId, column, completeKey); } - rowCounter++; return completeKey; } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java index 245135a..f338888 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java @@ -42,7 +42,6 @@ public class FilterQueryScannedResult extends BlockletScannedResult { */ @Override public byte[] getDictionaryKeyArray() { - ++currentRow; return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]); } @@ -52,7 +51,6 @@ public class FilterQueryScannedResult extends BlockletScannedResult { */ @Override public int[] getDictionaryKeyIntegerArray() { - ++currentRow; return getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java index c9f6b0c..98576fa 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java @@ -51,7 +51,6 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult { */ @Override public int[] getDictionaryKeyIntegerArray() { - ++currentRow; return getDictionaryKeyIntegerArray(currentRow); } diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 4cefea2..63f9471 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -17,20 +17,8 @@ package org.apache.carbondata.core.statusmanager; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.io.*; +import java.util.*; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -80,15 +68,30 @@ public class SegmentUpdateStatusManager { public SegmentUpdateStatusManager(CarbonTable table, LoadMetadataDetails[] segmentDetails) { + this(table, segmentDetails, null); + } + + /** + * It takes the updateVersion as one of the parameter. Basically user can give on which + * updateVersion user can retrieve the data.It is useful to get the history changed data + * of a particular version. + */ + public SegmentUpdateStatusManager(CarbonTable table, + LoadMetadataDetails[] segmentDetails, String updateVersion) { this.identifier = table.getAbsoluteTableIdentifier(); // current it is used only for read function scenarios, as file update always requires to work // on latest file status. this.segmentDetails = segmentDetails; updateDetails = readLoadMetadata(); + updateUpdateDetails(updateVersion); populateMap(); } public SegmentUpdateStatusManager(CarbonTable table) { + this(table, (String) null); + } + + public SegmentUpdateStatusManager(CarbonTable table, String updateVersion) { this.identifier = table.getAbsoluteTableIdentifier(); // current it is used only for read function scenarios, as file update always requires to work // on latest file status. @@ -104,10 +107,34 @@ public class SegmentUpdateStatusManager { } else { updateDetails = new SegmentUpdateDetails[0]; } + updateUpdateDetails(updateVersion); populateMap(); } /** + * It adds only the SegmentUpdateDetails of given updateVersion, it is used to get the history + * data of updated/deleted data. + */ + private void updateUpdateDetails(String updateVersion) { + if (updateVersion != null) { + List<SegmentUpdateDetails> newupdateDetails = new ArrayList<>(); + for (SegmentUpdateDetails updateDetail : updateDetails) { + if (updateDetail.getDeltaFileStamps() != null) { + if (updateDetail.getDeltaFileStamps().contains(updateVersion)) { + HashSet<String> set = new HashSet<>(); + set.add(updateVersion); + updateDetail.setDeltaFileStamps(set); + newupdateDetails.add(updateDetail); + } + } else if (updateDetail.getDeleteDeltaStartTimestamp().equalsIgnoreCase(updateVersion)) { + newupdateDetails.add(updateDetail); + } + } + updateDetails = newupdateDetails.toArray(new SegmentUpdateDetails[0]); + } + } + + /** * populate the block and its details in a map. */ private void populateMap() { @@ -640,21 +667,30 @@ public class SegmentUpdateStatusManager { * @return */ public SegmentUpdateDetails[] readLoadMetadata() { + // get the updated status file identifier from the table status. + String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier(); + return readLoadMetadata(tableUpdateStatusIdentifier, identifier.getTablePath()); + } + + /** + * This method loads segment update details + * + * @return + */ + public static SegmentUpdateDetails[] readLoadMetadata(String tableUpdateStatusIdentifier, + String tablePath) { Gson gsonObjectToRead = new Gson(); DataInputStream dataInputStream = null; BufferedReader buffReader = null; InputStreamReader inStream = null; SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray; - // get the updated status file identifier from the table status. - String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier(); - if (StringUtils.isEmpty(tableUpdateStatusIdentifier)) { return new SegmentUpdateDetails[0]; } String tableUpdateStatusPath = - CarbonTablePath.getMetadataPath(identifier.getTablePath()) + + CarbonTablePath.getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier; AtomicFileOperations fileOperation = AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath); @@ -735,7 +771,7 @@ public class SegmentUpdateStatusManager { * * @param streams - streams to close. */ - private void closeStreams(Closeable... streams) { + private static void closeStreams(Closeable... streams) { // Added if to avoid NullPointerException in case one stream is being passed as null if (null != streams) { for (Closeable stream : streams) { diff --git a/dev/javastyle-config.xml b/dev/javastyle-config.xml index 824cbc8..332c85a 100644 --- a/dev/javastyle-config.xml +++ b/dev/javastyle-config.xml @@ -175,9 +175,6 @@ <message key="import.redundancy" value="Redundant import {0}."/> </module> - <!-- Checks for star import. --> - <module name="AvoidStarImport"/> - <!-- Checks for placement of the left curly brace ('{'). --> <module name="LeftCurly"/> diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 10aabf2..ae8db43 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -120,6 +120,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap"; private static final String READ_COMMITTED_SCOPE = "mapreduce.input.carboninputformat.read.committed.scope"; + private static final String READ_ONLY_DELTA = "readDeltaOnly"; // record segment number and hit blocks protected int numSegments = 0; @@ -688,11 +689,16 @@ m filterExpression if (dataMapFilter != null) { checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit); } - return new QueryModelBuilder(carbonTable) + QueryModel queryModel = new QueryModelBuilder(carbonTable) .projectColumns(projectColumns) .filterExpression(dataMapFilter) .dataConverter(getDataTypeConverter(configuration)) .build(); + String readDeltaOnly = configuration.get(READ_ONLY_DELTA); + if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) { + queryModel.setReadOnlyDelta(true); + } + return queryModel; } /** diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 5468c24..c47cdd6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -85,6 +85,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { "mapreduce.input.carboninputformat.transactional"; public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; + public static final String UPDATE_DELTA_VERSION = "updateDeltaVersion"; // a cache for carbon table, it will be used in task side private CarbonTable carbonTable; private ReadCommittedScope readCommittedScope; @@ -107,9 +108,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } this.readCommittedScope = getReadCommitted(job, identifier); LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); - - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails); + String updateDeltaVersion = job.getConfiguration().get(UPDATE_DELTA_VERSION); + SegmentUpdateStatusManager updateStatusManager; + if (updateDeltaVersion != null) { + updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails, updateDeltaVersion); + } else { + updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails); + } List<String> invalidSegmentIds = new ArrayList<>(); List<Segment> streamSegments = null; // get all valid segments and set them into the configuration diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala new file mode 100644 index 0000000..f91bce0 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.merge + +import scala.collection.JavaConverters._ +import java.sql.Date + +import org.apache.spark.sql._ +import org.apache.spark.sql.CarbonSession._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand, DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, StringType, StructField, StructType} +import org.scalatest.BeforeAndAfterAll + +/** + * Test Class for join query with orderby and limit + */ + +class MergeTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + + } + + def generateData(numOrders: Int = 10): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numOrders, 4) + .map { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) + }.toDF("id", "name", "c_name", "quantity", "price", "state") + } + + def generateFullCDC( + numOrders: Int, + numUpdatedOrders: Int, + newState: Int, + oldState: Int, + numNewOrders: Int + ): DataFrame = { + import sqlContext.implicits._ + val ds1 = sqlContext.sparkContext.parallelize(numNewOrders+1 to (numOrders), 4) + .map {x => + if (x <= numNewOrders + numUpdatedOrders) { + ("id"+x, s"order$x",s"customer$x", x*10, x*75, newState) + } else { + ("id"+x, s"order$x",s"customer$x", x*10, x*75, oldState) + } + }.toDF("id", "name", "c_name", "quantity", "price", "state") + val ds2 = sqlContext.sparkContext.parallelize(1 to numNewOrders, 4) + .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, oldState) + }.toDS().toDF() + ds1.union(ds2) + } + + private def initialize = { + val initframe = generateData(10) + initframe.write + .format("carbondata") + .option("tableName", "order") + .mode(SaveMode.Overwrite) + .save() + + val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load() + val dwSelframe = dwframe.as("A") + + val odsframe = generateFullCDC(10, 2, 2, 1, 2).as("B") + (dwSelframe, odsframe) + } + + test("test basic merge update with all mappings") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + val updateMap = Map("id" -> "A.id", + "name" -> "B.name", + "c_name" -> "B.c_name", + "quantity" -> "B.quantity", + "price" -> "B.price", + "state" -> "B.state").asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched( + col("A.state") =!= col("B.state")).updateExpr(updateMap).execute() + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test basic merge update with few mappings") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + val updateMap = Map(col("id") -> col("A.id"), + col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, "A.id=B.id").whenMatched("A.state <> B.state").updateExpr(updateMap).execute() + + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test basic merge update with few mappings and expressions") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + val updateMap = Map("id" -> "A.id", + "price" -> "B.price * 100", + "state" -> "B.state").asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched( + col("A.state") =!= col("B.state")).updateExpr(updateMap).execute() + + checkAnswer(sql("select price from order where where state = 2"), Seq(Row(22500), Row(30000))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test basic merge update with few mappings with out condition") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + val updateMap = Map(col("id") -> col("A.id"), + col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().updateExpr(updateMap).execute() + + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test merge insert with condition") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + "c_name" -> col("B.c_name"), + col("quantity") -> "B.quantity", + col("price") -> col("B.price"), + col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))). + whenNotMatched(col("A.id").isNull.and(col("B.id").isNotNull)). + insertExpr(insertMap).execute() + + checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2))) + } + + test("test merge update and insert with out condition") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> col("B.price"), + col("state") -> col("B.state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap))) + matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) + + val st = System.currentTimeMillis() + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test merge update and insert with condition") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> col("B.price"), + col("state") -> col("B.state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap))) + matches ++= Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap))) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order"), Seq(Row(12))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test merge update and insert with condition and expression") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> expr("B.price * 100"), + col("state") -> col("B.state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap))) + matches ++= Seq(WhenNotMatched(Some(col("A.id").isNull.and(col("B.id").isNotNull))).addAction(InsertAction(insertMap))) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order"), Seq(Row(12))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500))) + } + + test("test merge with only delete action") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction())) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order"), Seq(Row(8))) + } + + test("test merge update and delete action") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap))) + matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction())) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order"), Seq(Row(8))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + } + + test("test merge update and insert with condition and expression and delete action") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> expr("B.price * 100"), + col("state") -> col("B.state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap))) + matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) + matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction())) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order"), Seq(Row(10))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500))) + } + + test("test merge update with insert, insert with condition and expression and delete with insert action") { + sql("drop table if exists order") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> "B.price + 1", + col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]] + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> expr("B.price * 100"), + col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]] + + val insertMap_u = Map(col("id") -> col("A.id"), + col("name") -> col("A.name"), + col("c_name") -> lit("insert"), + col("quantity") -> col("A.quantity"), + col("price") -> expr("A.price"), + col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]] + + val insertMap_d = Map(col("id") -> col("A.id"), + col("name") -> col("A.name"), + col("c_name") -> lit("delete"), + col("quantity") -> col("A.quantity"), + col("price") -> expr("A.price"), + col("state") -> col("A.state")).asInstanceOf[Map[Any, Any]] + + dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))). + whenMatched(col("A.state") =!= col("B.state")). + updateExpr(updateMap).insertExpr(insertMap_u). + whenNotMatched(). + insertExpr(insertMap). + whenNotMatchedAndExistsOnlyOnTarget(). + delete(). + insertExpr(insertMap_d). + execute() + sql("select * from order").show() + checkAnswer(sql("select count(*) from order where c_name = 'delete'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order where c_name = 'insert'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order"), Seq(Row(14))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500))) + } + + test("test merge update with insert, insert with condition and expression and delete with insert history action") { + sql("drop table if exists order") + sql("drop table if exists order_hist") + sql("create table order_hist(id string, name string, c_name string, quantity int, price int, state int) stored as carbondata") + val (dwSelframe, odsframe) = initialize + + var matches = Seq.empty[MergeMatch] + val updateMap = Map(col("id") -> col("A.id"), + col("price") -> expr("B.price + 1"), + col("state") -> col("B.state")) + + val insertMap = Map(col("id") -> col("B.id"), + col("name") -> col("B.name"), + col("c_name") -> col("B.c_name"), + col("quantity") -> col("B.quantity"), + col("price") -> expr("B.price * 100"), + col("state") -> col("B.state")) + + val insertMap_u = Map(col("id") -> col("id"), + col("name") -> col("name"), + col("c_name") -> lit("insert"), + col("quantity") -> col("quantity"), + col("price") -> expr("price"), + col("state") -> col("state")) + + val insertMap_d = Map(col("id") -> col("id"), + col("name") -> col("name"), + col("c_name") -> lit("delete"), + col("quantity") -> col("quantity"), + col("price") -> expr("price"), + col("state") -> col("state")) + + matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u, TableIdentifier("order_hist")))) + matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap))) + matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d, TableIdentifier("order_hist")))) + + CarbonMergeDataSetCommand(dwSelframe, + odsframe, + MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession) + checkAnswer(sql("select count(*) from order"), Seq(Row(10))) + checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2))) + checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500))) + checkAnswer(sql("select count(*) from order_hist where c_name = 'delete'"), Seq(Row(2))) + checkAnswer(sql("select count(*) from order_hist where c_name = 'insert'"), Seq(Row(2))) + } + + test("check the scd ") { + sql("drop table if exists customers") + + val initframe = + sqlContext.sparkSession.createDataFrame(Seq( + Row(1, "old address for 1", false, null, Date.valueOf("2018-02-01")), + Row(1, "current address for 1", true, Date.valueOf("2018-02-01"), null), + Row(2, "current address for 2", true, Date.valueOf("2018-02-01"), null), + Row(3, "current address for 3", true, Date.valueOf("2018-02-01"), null) + ).asJava, StructType(Seq(StructField("customerId", IntegerType), StructField("address", StringType), StructField("current", BooleanType), StructField("effectiveDate", DateType), StructField("endDate", DateType)))) + initframe.printSchema() + initframe.write + .format("carbondata") + .option("tableName", "customers") + .mode(SaveMode.Overwrite) + .save() + var customer = sqlContext.read.format("carbondata").option("tableName", "customers").load() + customer = customer.as("A") + var updates = + sqlContext.sparkSession.createDataFrame(Seq( + Row(1, "new address for 1", Date.valueOf("2018-03-03")), + Row(3, "current address for 3", Date.valueOf("2018-04-04")), // new address same as current address for customer 3 + Row(4, "new address for 4", Date.valueOf("2018-04-04")) + ).asJava, StructType(Seq(StructField("customerId", IntegerType), StructField("address", StringType), StructField("effectiveDate", DateType)))) + updates = updates.as("B") + + val updateMap = Map(col("current") -> lit(false), + col("endDate") -> col("B.effectiveDate")).asInstanceOf[Map[Any, Any]] + + val insertMap = Map(col("customerId") -> col("B.customerId"), + col("address") -> col("B.address"), + col("current") -> lit(true), + col("effectiveDate") -> col("B.effectiveDate"), + col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]] + + val insertMap_u = Map(col("customerId") -> col("B.customerId"), + col("address") -> col("B.address"), + col("current") -> lit(true), + col("effectiveDate") -> col("B.effectiveDate"), + col("endDate") -> lit(null)).asInstanceOf[Map[Any, Any]] + + customer.merge(updates, "A.customerId=B.customerId"). + whenMatched((col("A.address") =!= col("B.address")).and(col("A.current").equalTo(lit(true)))). + updateExpr(updateMap). + insertExpr(insertMap_u). + whenNotMatched(col("A.customerId").isNull.and(col("B.customerId").isNotNull)). + insertExpr(insertMap). + execute() + + checkAnswer(sql("select count(*) from customers"), Seq(Row(6))) + checkAnswer(sql("select count(*) from customers where current='true'"), Seq(Row(4))) + checkAnswer(sql("select count(*) from customers where effectivedate is not null and enddate is not null"), Seq(Row(1))) + + } + + test("check the ccd ") { + sql("drop table if exists target") + + val initframe = sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "0"), + Row("b", "1"), + Row("c", "2"), + Row("d", "3") + ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType)))) + + initframe.write + .format("carbondata") + .option("tableName", "target") + .mode(SaveMode.Overwrite) + .save() + val target = sqlContext.read.format("carbondata").option("tableName", "target").load() + var ccd = + sqlContext.sparkSession.createDataFrame(Seq( + Row("a", "10", false, 0), + Row("a", null, true, 1), // a was updated and then deleted + Row("b", null, true, 2), // b was just deleted once + Row("c", null, true, 3), // c was deleted and then updated twice + Row("c", "20", false, 4), + Row("c", "200", false, 5), + Row("e", "100", false, 6) // new key + ).asJava, + StructType(Seq(StructField("key", StringType), + StructField("newValue", StringType), + StructField("deleted", BooleanType), StructField("time", IntegerType)))) + ccd.createOrReplaceTempView("changes") + + ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)") + + val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] + + val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any, Any]] + + target.as("A").merge(ccd.as("B"), "A.key=B.key"). + whenMatched("B.deleted=false"). + updateExpr(updateMap). + whenNotMatched("B.deleted=false"). + insertExpr(insertMap). + whenMatched("B.deleted=true"). + delete().execute() + checkAnswer(sql("select count(*) from target"), Seq(Row(3))) + checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"), Row("e", "100"))) + } + + override def afterAll { + sql("drop table if exists order") + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala new file mode 100644 index 0000000..ea32bdf --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.Partition +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.converter.SparkDataTypeConverterImpl +import org.apache.carbondata.core.datamap.DataMapFilter +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager +import org.apache.carbondata.core.util.DataTypeConverter +import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport +import org.apache.carbondata.spark.InitInputMetrics + +/** + * It can get the deleted/updated records on any particular update version. It is useful to get the + * records changed on any particular update transaction. + */ +class CarbonDeltaRowScanRDD[T: ClassTag]( + @transient private val spark: SparkSession, + @transient private val serializedTableInfo: Array[Byte], + @transient private val tableInfo: TableInfo, + @transient override val partitionNames: Seq[PartitionSpec], + override val columnProjection: CarbonProjection, + var filter: DataMapFilter, + identifier: AbsoluteTableIdentifier, + inputMetricsStats: InitInputMetrics, + override val dataTypeConverterClz: Class[_ <: DataTypeConverter] = + classOf[SparkDataTypeConverterImpl], + override val readSupportClz: Class[_ <: CarbonReadSupport[_]] = + SparkReadSupport.readSupportClass, + deltaVersionToRead: String) extends + CarbonScanRDD[T]( + spark, + columnProjection, + filter, + identifier, + serializedTableInfo, + tableInfo, + inputMetricsStats, + partitionNames, + dataTypeConverterClz, + readSupportClz) { + override def internalGetPartitions: Array[Partition] = { + val table = CarbonTable.buildFromTableInfo(getTableInfo) + val updateStatusManager = new SegmentUpdateStatusManager(table, deltaVersionToRead) + + val parts = super.internalGetPartitions + parts.map { p => + val partition = p.asInstanceOf[CarbonSparkPartition] + val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s => + updateStatusManager.getDetailsForABlock( + CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId, s.getBlockPath)) != null + }.asJava + new CarbonSparkPartition(partition.rddId, partition.index, + new CarbonMultiBlockSplit(splits, partition.multiBlockSplit.getLocations)) + }.filter(p => p.multiBlockSplit.getAllSplits.size() > 0).asInstanceOf[Array[Partition]] + } + + override def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { + val format = super.createInputFormat(conf) + conf.set("updateDeltaVersion", deltaVersionToRead) + conf.set("readDeltaOnly", "true") + format + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index a6dfc8a..5d75742 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -638,7 +638,7 @@ class CarbonScanRDD[T: ClassTag]( } - private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { + def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { val format = new CarbonTableInputFormat[Object] CarbonInputFormat.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath)) diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala index ca35adc..62e52d0 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala @@ -206,7 +206,7 @@ object DistributionUtil { * @param sparkContext * @return */ - private def getConfiguredExecutors(sparkContext: SparkContext): Int = { + def getConfiguredExecutors(sparkContext: SparkContext): Int = { var confExecutors: Int = 0 if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { // default value for spark.dynamicAllocation.maxExecutors is infinity diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java new file mode 100644 index 0000000..a76c79d --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkGenericRowReadSupportImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.readsupport; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Calendar; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; + +public class SparkGenericRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> { + + @Override + public void initialize(CarbonColumn[] carbonColumns, + CarbonTable carbonTable) throws IOException { + super.initialize(carbonColumns, carbonTable); + } + + @Override + public Row readRow(Object[] data) { + assert (data.length == dictionaries.length); + for (int i = 0; i < dictionaries.length; i++) { + if (dictionaries[i] != null) { + data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]); + } + if (dataTypes[i] == DataTypes.DATE) { + Calendar c = Calendar.getInstance(); + c.setTime(new Date(0)); + c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]); + data[i] = new Date(c.getTime().getTime()); + } else if (dataTypes[i] == DataTypes.TIMESTAMP) { + data[i] = new Timestamp((long) data[i] / 1000); + } + } + return new GenericRow(data); + } +} diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala index 233ad4d..bf5bc40 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hdfs.DFSClient.Conf import org.apache.hadoop.mapreduce.InputSplit import org.apache.spark.Partition import org.apache.spark.sql.SparkSession @@ -33,10 +32,9 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment} import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope} +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel object DistributedRDDUtils { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index deefcd1..063eaf5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.command.mutation.merge.MergeDataSetBuilder import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.sql.profiler.{Profiler, SQLStart} @@ -283,6 +284,16 @@ object CarbonSession { } } + implicit class DataSetMerge(val ds: Dataset[Row]) { + def merge(srcDS: Dataset[Row], expr: String): MergeDataSetBuilder = { + new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession) + } + + def merge(srcDS: Dataset[Row], expr: Column): MergeDataSetBuilder = { + new MergeDataSetBuilder(ds, srcDS, expr, ds.sparkSession) + } + } + def threadSet(key: String, value: String): Unit = { var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (currentThreadSessionInfo == null) { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index ecb74dc..d744e96 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} -import org.apache.carbondata.core.mutate.data.RowCountDetailsVO +import org.apache.carbondata.core.mutate.data.{BlockMappingVO, RowCountDetailsVO} import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} @@ -55,35 +55,71 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl object DeleteExecution { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + def deleteDeltaExecution( + databaseNameOp: Option[String], + tableName: String, + sparkSession: SparkSession, + dataRdd: RDD[Row], + timestamp: String, + isUpdateOperation: Boolean, + executorErrors: ExecutionErrors): (Seq[Segment], Long) = { + + val (res, blockMappingVO) = deleteDeltaExecutionInternal(databaseNameOp, + tableName, sparkSession, dataRdd, timestamp, isUpdateOperation, executorErrors) + var segmentsTobeDeleted = Seq.empty[Segment] + var operatedRowCount = 0L + // if no loads are present then no need to do anything. + if (res.flatten.isEmpty) { + return (segmentsTobeDeleted, operatedRowCount) + } + val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + // update new status file + segmentsTobeDeleted = + checkAndUpdateStatusFiles(executorErrors, + res, carbonTable, timestamp, + blockMappingVO, isUpdateOperation) + + if (executorErrors.failureCauses == FailureCauses.NONE) { + operatedRowCount = res.flatten.map(_._2._3).sum + } + (segmentsTobeDeleted, operatedRowCount) + } + /** * generate the delete delta files in each segment as per the RDD. * @return it gives the segments which needs to be deleted. */ - def deleteDeltaExecution( + def deleteDeltaExecutionInternal( databaseNameOp: Option[String], tableName: String, sparkSession: SparkSession, dataRdd: RDD[Row], timestamp: String, isUpdateOperation: Boolean, - executorErrors: ExecutionErrors): (Seq[Segment], Long) = { + executorErrors: ExecutionErrors, + tupleId: Option[Int] = None): + (Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], BlockMappingVO) = { var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]] = null val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val tablePath = absoluteTableIdentifier.getTablePath - var segmentsTobeDeleted = Seq.empty[Segment] - var operatedRowCount = 0L val deleteRdd = if (isUpdateOperation) { val schema = org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField( CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, org.apache.spark.sql.types.StringType))) - val rdd = dataRdd - .map(row => Row(row.get(row.fieldIndex( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)))) + val rdd = tupleId match { + case Some(id) => + dataRdd + .map(row => Row(row.get(id))) + case _ => + dataRdd + .map(row => Row(row.get(row.fieldIndex( + CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)))) + } sparkSession.createDataFrame(rdd, schema).rdd } else { dataRdd @@ -91,16 +127,26 @@ object DeleteExecution { val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier) CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo) - val keyRdd = deleteRdd.map({ row => - val tupleId: String = row - .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) - (key, row) - }).groupByKey() + val keyRdd = tupleId match { + case Some(id) => + deleteRdd.map { row => + val tupleId: String = row.getString(id) + val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) + (key, row) + }.groupByKey() + case _ => + deleteRdd.map { row => + val tupleId: String = row + .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) + val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) + (key, row) + }.groupByKey() + } // if no loads are present then no need to do anything. if (keyRdd.partitions.length == 0) { - return (segmentsTobeDeleted, operatedRowCount) + return (Array.empty[List[(SegmentStatus, + (SegmentUpdateDetails, ExecutionErrors, Long))]], null) } val blockMappingVO = carbonInputFormat.getBlockRowCount( @@ -144,76 +190,6 @@ object DeleteExecution { result }).collect() - // if no loads are present then no need to do anything. - if (res.flatten.isEmpty) { - return (segmentsTobeDeleted, operatedRowCount) - } - - // update new status file - checkAndUpdateStatusFiles() - - // all or none : update status file, only if complete delete opeartion is successfull. - def checkAndUpdateStatusFiles(): Unit = { - val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() - val segmentDetails = new util.HashSet[Segment]() - res.foreach(resultOfSeg => resultOfSeg.foreach( - resultOfBlock => { - if (resultOfBlock._1 == SegmentStatus.SUCCESS) { - blockUpdateDetailsList.add(resultOfBlock._2._1) - segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName)) - // if this block is invalid then decrement block count in map. - if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { - CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, - blockMappingVO.getSegmentNumberOfBlockMapping) - } - } else { - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - val errorMsg = - "Delete data operation is failed due to failure in creating delete delta file for " + - "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + - resultOfBlock._2._1.getBlockName - executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - executorErrors.errorMsg = resultOfBlock._2._2.errorMsg - - if (executorErrors.failureCauses == FailureCauses.NONE) { - executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - executorErrors.errorMsg = errorMsg - } - LOGGER.error(errorMsg) - return - } - } - ) - ) - - val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil - .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) - - segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala - - // this is delete flow so no need of putting timestamp in the status file. - if (CarbonUpdateUtil - .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) && - CarbonUpdateUtil - .updateTableMetadataStatus(segmentDetails, - carbonTable, - timestamp, - !isUpdateOperation, - listOfSegmentToBeMarkedDeleted) - ) { - LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") - } else { - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - val errorMessage = "Delete data operation is failed due to failure " + - "in table status updation." - LOGGER.error("Delete data operation is failed due to failure in table status updation.") - executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE - executorErrors.errorMsg = errorMessage - } - } - def deleteDeltaFunc(index: Int, key: String, iter: Iterator[Row], @@ -322,10 +298,118 @@ object DeleteExecution { resultIter } - if (executorErrors.failureCauses == FailureCauses.NONE) { - operatedRowCount = res.flatten.map(_._2._3).sum + (res, blockMappingVO) + } + + // all or none : update status file, only if complete delete opeartion is successfull. + def checkAndUpdateStatusFiles( + executorErrors: ExecutionErrors, + res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], + carbonTable: CarbonTable, + timestamp: String, + blockMappingVO: BlockMappingVO, + isUpdateOperation: Boolean): Seq[Segment] = { + val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() + val segmentDetails = new util.HashSet[Segment]() + res.foreach(resultOfSeg => resultOfSeg.foreach( + resultOfBlock => { + if (resultOfBlock._1 == SegmentStatus.SUCCESS) { + blockUpdateDetailsList.add(resultOfBlock._2._1) + segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName)) + // if this block is invalid then decrement block count in map. + if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { + CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, + blockMappingVO.getSegmentNumberOfBlockMapping) + } + } else { + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + val errorMsg = + "Delete data operation is failed due to failure in creating delete delta file for " + + "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + + resultOfBlock._2._1.getBlockName + executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + executorErrors.errorMsg = resultOfBlock._2._2.errorMsg + + if (executorErrors.failureCauses == FailureCauses.NONE) { + executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + executorErrors.errorMsg = errorMsg + } + LOGGER.error(errorMsg) + return Seq.empty[Segment] + } + })) + + val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil + .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) + + val segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala + + // this is delete flow so no need of putting timestamp in the status file. + if (CarbonUpdateUtil + .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) && + CarbonUpdateUtil + .updateTableMetadataStatus(segmentDetails, + carbonTable, + timestamp, + !isUpdateOperation, + listOfSegmentToBeMarkedDeleted) + ) { + LOGGER.info(s"Delete data operation is successful for " + + s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + } else { + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + val errorMessage = "Delete data operation is failed due to failure " + + "in table status updation." + LOGGER.error("Delete data operation is failed due to failure in table status updation.") + executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE + executorErrors.errorMsg = errorMessage } - (segmentsTobeDeleted, operatedRowCount) + segmentsTobeDeleted + } + + // all or none : update status file, only if complete delete opeartion is successfull. + def processSegments(executorErrors: ExecutionErrors, + res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]], + carbonTable: CarbonTable, + timestamp: String, + blockMappingVO: BlockMappingVO): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() + val segmentDetails = new util.HashSet[Segment]() + res.foreach(resultOfSeg => resultOfSeg.foreach( + resultOfBlock => { + if (resultOfBlock._1 == SegmentStatus.SUCCESS) { + blockUpdateDetailsList.add(resultOfBlock._2._1) + segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName)) + // if this block is invalid then decrement block count in map. + if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) { + CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, + blockMappingVO.getSegmentNumberOfBlockMapping) + } + } else { + // In case of failure , clean all related delete delta files + CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) + val errorMsg = + "Delete data operation is failed due to failure in creating delete delta file for " + + "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + + resultOfBlock._2._1.getBlockName + executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + executorErrors.errorMsg = resultOfBlock._2._2.errorMsg + + if (executorErrors.failureCauses == FailureCauses.NONE) { + executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + executorErrors.errorMsg = errorMsg + } + LOGGER.error(errorMsg) + return (blockUpdateDetailsList, Seq.empty[Segment]) + } + })) + + val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil + .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) + + (blockUpdateDetailsList, listOfSegmentToBeMarkedDeleted.asScala) } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala new file mode 100644 index 0000000..3c0acc6 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericRowWithSchema} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.DistributionUtil +import org.apache.spark.sql.types.{IntegerType, StringType, StructField} +import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LongAccumulator} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable +import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * This command will merge the data of source dataset to target dataset backed by carbon table. + * @param targetDsOri Target dataset to merge the data. This dataset should be backed by carbontable + * @param srcDS Source dataset, it can be any data. + * @param mergeMatches It contains the join condition and list match conditions to apply. + */ +case class CarbonMergeDataSetCommand( + targetDsOri: Dataset[Row], + srcDS: Dataset[Row], + var mergeMatches: MergeDataSetMatches) + extends DataCommand { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + /** + * It merge the data of source dataset to target dataset backed by carbon table. Basically it + * makes the full outer join with both source and target and apply the given conditions as "case + * when" to get the status to process the row. The status could be insert/update/delete. + * It also can insert the history(update/delete) data to history table. + * + */ + override def processData(sparkSession: SparkSession): Seq[Row] = { + val rltn = collectCarbonRelation(targetDsOri.logicalPlan) + // Target dataset must be backed by carbondata table. + if (rltn.length != 1) { + throw new UnsupportedOperationException( + "Carbon table supposed to be present in merge dataset") + } + // validate the merge matches and actions. + validateMergeActions(mergeMatches, targetDsOri, sparkSession) + val carbonTable = rltn.head.carbonRelation.carbonTable + val hasDelAction = mergeMatches.matchList + .exists(_.getActions.exists(_.isInstanceOf[DeleteAction])) + val hasUpdateAction = mergeMatches.matchList + .exists(_.getActions.exists(_.isInstanceOf[UpdateAction])) + val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches) + // Get all the required columns of targetDS by going through all match conditions and actions. + val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession) + // select only the required columns, it can avoid lot of and shuffling. + val targetDs = targetDsOri.select(columns: _*) + // Update the update mapping with unfilled columns.From here on system assumes all mappings + // are existed. + mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs) + // Lets generate all conditions combinations as one column and add them as 'status'. + val condition = generateStatusColumnWithAllCombinations(mergeMatches) + + // Add the tupleid udf to get the tupleid to generate delete delta. + val frame = targetDs.withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, + expr("getTupleId()")).withColumn("exist_on_target", lit(1)).join( + srcDS.withColumn("exist_on_src", lit(1)), + // Do the full outer join to get the data from both sides without missing anything. + // TODO As per the match conditions choose the join, sometimes it might be possible to use + // left_outer join. + mergeMatches.joinExpr, "full_outer").withColumn("status", condition) + if (LOGGER.isDebugEnabled) { + frame.explain() + } + val tableCols = + carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName). + filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + val builder = new CarbonLoadModelBuilder(carbonTable) + val options = Seq(("fileheader", tableCols.mkString(","))).toMap + val model = builder.build(options.asJava, CarbonUpdateUtil.readCurrentTime, "1") + model.setLoadWithoutConverterStep(true) + val newLoadMetaEntry = new LoadMetadataDetails + CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry, + SegmentStatus.INSERT_IN_PROGRESS, + model.getFactTimeStamp, + false) + CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false) + + model.setCsvHeader(tableCols.mkString(",")) + + val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { m => + m.getActions.map { + case u: UpdateAction => MergeProjection(tableCols, frame, rltn.head, sparkSession, u) + case i: InsertAction => MergeProjection(tableCols, frame, rltn.head, sparkSession, i) + case d: DeleteAction => MergeProjection(tableCols, frame, rltn.head, sparkSession, d) + case _ => null + }.filter(_ != null) + } + + val st = System.currentTimeMillis() + // Create accumulators to log the stats + val stats = Stats(createLongAccumalator("insertedRows"), + createLongAccumalator("updatedRows"), + createLongAccumalator("deletedRows")) + val processedRDD = processIUD(sparkSession, frame, carbonTable, model, projections, stats) + + val executorErrors = ExecutionErrors(FailureCauses.NONE, "") + val trxMgr = TranxManager(model.getFactTimeStamp) + + val mutationAction = MutationActionFactory.getMutationAction(sparkSession, + carbonTable, hasDelAction, hasUpdateAction, + insertHistOfUpdate, insertHistOfDelete) + + val tuple = mutationAction.handleAction(processedRDD, executorErrors, trxMgr) + + // In case user only has insert action. + if (!(hasDelAction || hasUpdateAction)) { + processedRDD.count() + } + LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}") + LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}") + LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}") + LOGGER.info( + " Time taken to merge data : " + tuple + " :: " + (System.currentTimeMillis() - st)) + + val segment = new Segment(model.getSegmentId, + SegmentFileStore.genSegmentFileName( + model.getSegmentId, + System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT, + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, + model.getSegmentId), Map.empty[String, String].asJava) + val writeSegment = + SegmentFileStore.writeSegmentFile(carbonTable, segment) + + if (writeSegment) { + SegmentFileStore.updateTableStatusFile( + carbonTable, + model.getSegmentId, + segment.getSegmentFileName, + carbonTable.getCarbonTableIdentifier.getTableId, + new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName), + SegmentStatus.SUCCESS) + } else { + CarbonLoaderUtil.updateTableStatusForFailure(model) + } + + if (hasDelAction || hasUpdateAction) { + if (CarbonUpdateUtil.updateSegmentStatus(tuple._1, carbonTable, + trxMgr.getLatestTrx.toString, false) && + CarbonUpdateUtil + .updateTableMetadataStatus( + model.getLoadMetadataDetails.asScala.map(l => + new Segment(l.getMergedLoadName, + l.getSegmentFile)).toSet.asJava, + carbonTable, + trxMgr.getLatestTrx.toString, + true, + tuple._2.asJava)) { + LOGGER.info(s"Merge data operation is successful for " + + s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + } else { + throw new CarbonMergeDataSetException("Saving update status or table status failed") + } + } + // Load the history table if the inserthistorytable action is added by user. + HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head, carbonTable, + trxMgr, mutationAction, mergeMatches) + Seq.empty + } + + /** + * As per the status of the row either it inserts the data or update/delete the data. + */ + private def processIUD(sparkSession: SparkSession, + frame: DataFrame, + carbonTable: CarbonTable, + model: CarbonLoadModel, + projections: Seq[Seq[MergeProjection]], + stats: Stats) = { + val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf() + val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, conf) + val frameCols = frame.queryExecution.analyzed.output + val status = frameCols.length - 1 + val tupleId = frameCols.zipWithIndex + .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2 + val insertedRows = stats.insertedRows + val updatedRows = stats.updatedRows + val deletedRows = stats.deletedRows + frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)). + mapPartitionsWithIndex { case (index, iter) => + val confB = config.value.value + CarbonTableOutputFormat.setCarbonTable(confB, carbonTable) + model.setTaskNo(index.toString) + CarbonTableOutputFormat.setLoadModel(confB, model) + val jobId = new JobID(UUID.randomUUID.toString, 0) + val task = new TaskID(jobId, TaskType.MAP, index) + val attemptID = new TaskAttemptID(task, index) + val context = new TaskAttemptContextImpl(confB, attemptID) + val writer = new CarbonTableOutputFormat().getRecordWriter(context) + val writable = new ObjectArrayWritable + val projLen = projections.length + val schema = + org.apache.spark.sql.types.StructType(Seq( + StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType), + StructField("status", IntegerType))) + new Iterator[Row] { + override def hasNext: Boolean = { + if (iter.hasNext) { + true + } else { + writer.close(context) + false + } + } + + override def next(): Row = { + val row = iter.next() + val rowWithSchema = row.asInstanceOf[GenericRowWithSchema] + val is = row.get(status) + var isUpdate = false + var isDelete = false + var insertedCount = 0 + if (is != null) { + val isInt = is.asInstanceOf[Int] + var i = 0; + while (i < projLen) { + if ((isInt & (1 << i)) == (1 << i)) { + projections(i).foreach { p => + if (!p.isDelete) { + if (p.isUpdate) { + isUpdate = p.isUpdate + } + writable.set(p(rowWithSchema)) + writer.write(NullWritable.get(), writable) + insertedCount += 1 + } else { + isDelete = true + } + } + } + i = i + 1 + } + } + val newArray = new Array[Any](2) + newArray(0) = row.getString(tupleId) + if (isUpdate && isDelete) { + newArray(1) = 102 + updatedRows.add(1) + deletedRows.add(1) + insertedCount -= 1 + } else if (isUpdate) { + updatedRows.add(1) + newArray(1) = 101 + insertedCount -= 1 + } else if (isDelete) { + newArray(1) = 100 + deletedRows.add(1) + } else { + newArray(1) = is + } + insertedRows.add(insertedCount) + new GenericRowWithSchema(newArray, schema) + } + } + }.cache() + } + + private def createLongAccumalator(name: String) = { + val acc = new LongAccumulator + acc.setValue(0) + acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), false) + AccumulatorContext.register(acc) + acc + } + + /** + * It generates conditions for all possible scenarios and add a integer number for each match. + * There could be scenarios that one row can match two conditions so it should apply the actions + * of both the matches to the row. + * For example : + * whenmathed(a=c1) + * update() + * whennotmatched(b=d1) + * insert() + * whennotmatched(b=d2) + * insert() + * + * The above merge statement will be converted to + * (case when a=c1 and b=d1 and b=d2 then 7 + * when a=c1 and b=d1 then 6 + * when a=c1 and b=d2 then 5 + * when a=c1 then 4 + * when b=d1 and b=d2 then 3 + * when b=d1 then 2 + * when b=d2 the 1) as status + * + * So it would not be recommended use so many merge conditions as it increase the case when + * statements exponentially. + * + * @param mergeMatches + * @return + */ + def generateStatusColumnWithAllCombinations(mergeMatches: MergeDataSetMatches): Column = { + var exprList = new ArrayBuffer[(Column, Int)]() + val matchList = mergeMatches.matchList + val len = matchList.length + val N = Math.pow(2d, len.toDouble).toInt + var i = 1 + while (i < N) { + var status = 0 + var column: Column = null + val code = Integer.toBinaryString(N | i).substring(1) + var j = 0 + while (j < len) { + if (code.charAt(j) == '1') { + val mergeMatch = matchList(j) + if (column == null) { + if (mergeMatch.getExp.isDefined) { + column = mergeMatch.getExp.get + } + } else { + if (mergeMatch.getExp.isDefined) { + column = column.and(mergeMatch.getExp.get) + } + } + mergeMatch match { + case wm: WhenMatched => + val existsOnBoth = col("exist_on_target").isNotNull.and( + col("exist_on_src").isNotNull) + column = if (column == null) { + existsOnBoth + } else { + column.and(existsOnBoth) + } + case wnm: WhenNotMatched => + val existsOnSrc = col("exist_on_target").isNull.and( + col("exist_on_src").isNotNull) + column = if (column == null) { + existsOnSrc + } else { + column.and(existsOnSrc) + } + case wnm: WhenNotMatchedAndExistsOnlyOnTarget => + val existsOnSrc = col("exist_on_target").isNotNull.and( + col("exist_on_src").isNull) + column = if (column == null) { + existsOnSrc + } else { + column.and(existsOnSrc) + } + case _ => + } + status = status | 1 << j + } + j += 1 + } + if (column == null) { + column = lit(true) === lit(true) + } + exprList += ((column, status)) + i += 1 + } + exprList = exprList.reverse + var condition: Column = null + exprList.foreach { case (col, status) => + if (condition == null) { + condition = when(col, lit(status)) + } else { + condition = condition.when(col, lit(status)) + } + } + condition.otherwise(lit(null)) + } + + private def getSelectExpressionsOnExistingDF(existingDs: Dataset[Row], + mergeMatches: MergeDataSetMatches, sparkSession: SparkSession): Seq[Column] = { + var projects = Seq.empty[Attribute] + val existAttrs = existingDs.queryExecution.analyzed.output + projects ++= selectAttributes(mergeMatches.joinExpr.expr, existingDs, sparkSession) + mergeMatches.matchList.foreach { m => + if (m.getExp.isDefined) { + projects ++= selectAttributes(m.getExp.get.expr, existingDs, sparkSession) + } + m.getActions.foreach { + case u: UpdateAction => + projects ++= existAttrs.filterNot { f => + u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name)) + } + case i: InsertAction => + if (!existAttrs.forall(f => i.insertMap + .exists(_._1.toString().equalsIgnoreCase(f.name)))) { + throw new CarbonMergeDataSetException( + "Not all source columns are mapped for insert action " + i.insertMap) + } + i.insertMap.foreach { case (k, v) => + projects ++= selectAttributes(v.expr, existingDs, sparkSession) + } + case _ => + } + } + projects.map(_.name.toLowerCase).distinct.map { p => + existingDs.col(p) + } + } + + private def updateMappingIfNotExists(mergeMatches: MergeDataSetMatches, + existingDs: Dataset[Row]): MergeDataSetMatches = { + val existAttrs = existingDs.queryExecution.analyzed.output + val updateCommand = mergeMatches.matchList.map { m => + val updateAction = m.getActions.map { + case u: UpdateAction => + if (u.updateMap.isEmpty) { + throw new CarbonMergeDataSetException( + "At least one column supposed to be updated for update action") + } + val attributes = existAttrs.filterNot { f => + u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name)) + } + val newMap = attributes.map(a => (existingDs.col(a.name), existingDs.col(a.name))).toMap + u.copy(u.updateMap ++ newMap) + case other => other + } + m.updateActions(updateAction) + } + mergeMatches.copy(matchList = + updateCommand.filterNot(_.getActions.exists(_.isInstanceOf[DeleteAction])) + ++ updateCommand.filter(_.getActions.exists(_.isInstanceOf[DeleteAction]))) + } + + private def selectAttributes(expression: Expression, existingDs: Dataset[Row], + sparkSession: SparkSession, throwError: Boolean = false) = { + expression.collect { + case a: Attribute => + val resolved = existingDs.queryExecution + .analyzed.resolveQuoted(a.name, sparkSession.sessionState.analyzer.resolver) + if (resolved.isDefined) { + resolved.get.toAttribute + } else if (throwError) { + throw new CarbonMergeDataSetException( + expression + " cannot be resolved with dataset " + existingDs) + } else { + null + } + }.filter(_ != null) + } + + private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDatasourceHadoopRelation] = { + plan collect { + case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + } + } + + private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = { + val insertHistOfUpdate = mergeMatches.matchList.exists(p => + p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) + && p.getActions.exists(_.isInstanceOf[UpdateAction])) + val insertHistOfDelete = mergeMatches.matchList.exists(p => + p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) + && p.getActions.exists(_.isInstanceOf[DeleteAction])) + (insertHistOfUpdate, insertHistOfDelete) + } + + private def validateMergeActions(mergeMatches: MergeDataSetMatches, + existingDs: Dataset[Row], sparkSession: SparkSession): Unit = { + val existAttrs = existingDs.queryExecution.analyzed.output + if (mergeMatches.matchList.exists(m => m.getActions.exists(_.isInstanceOf[DeleteAction]) + && m.getActions.exists(_.isInstanceOf[UpdateAction]))) { + throw new AnalysisException( + "Delete and update action should not be under same merge condition") + } + if (mergeMatches.matchList.count(m => m.getActions.exists(_.isInstanceOf[DeleteAction])) > 1) { + throw new AnalysisException("Delete action should not be more than once across merge") + } + mergeMatches.matchList.foreach { f => + if (f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])) { + if (!(f.getActions.exists(_.isInstanceOf[UpdateAction]) || + f.getActions.exists(_.isInstanceOf[DeleteAction]))) { + throw new AnalysisException("For inserting to history table, " + + "it should be along with either update or delete action") + } + val value = f.getActions.find(_.isInstanceOf[InsertInHistoryTableAction]).get. + asInstanceOf[InsertInHistoryTableAction] + if (!existAttrs.forall(f => value.insertMap + .exists(_._1.toString().equalsIgnoreCase(f.name)))) { + throw new AnalysisException( + "Not all source columns are mapped for insert action " + value.insertMap) + } + value.insertMap.foreach { case (k, v) => + selectAttributes(v.expr, existingDs, sparkSession, true) + } + } + } + } + + override protected def opName: String = "MERGE DATASET" +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala new file mode 100644 index 0000000..be59b9a --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetException.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +/** + * Exception during merge operation. + */ +class CarbonMergeDataSetException(msg: String, exception: Throwable) + extends Exception(msg, exception) { + + def this(exception: Throwable) { + this("", exception) + } + + def this(msg: String) { + this(msg, null) + } + +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala new file mode 100644 index 0000000..6ef7c37 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/HistoryTableLoadHelper.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import scala.collection.JavaConverters._ + +import org.apache.spark.CarbonInputMetrics +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.DataTypeConverterImpl +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.spark.rdd.CarbonDeltaRowScanRDD +import org.apache.carbondata.spark.readsupport.SparkGenericRowReadSupportImpl + +object HistoryTableLoadHelper { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + /** + * Load to history table by reading the data from target table using the last transactions. So + * here we read the deleted data from target table by using delta and load them to history table. + */ + def loadHistoryTable(sparkSession: SparkSession, + rltn: CarbonDatasourceHadoopRelation, + carbonTable: CarbonTable, + trxMgr: TranxManager, + mutationAction: MutationAction, + mergeMatches: MergeDataSetMatches): Unit = { + if (!mutationAction.isInstanceOf[HandleUpdateAndDeleteAction]) { + val insert = mergeMatches + .matchList + .filter { f => + f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) + } + .head + .getActions + .find(_.isInstanceOf[InsertInHistoryTableAction]) + .get + .asInstanceOf[InsertInHistoryTableAction] + // Get the history table dataframe. + val histDataFrame: Dataset[Row] = sparkSession.table(insert.historyTable) + // check if the user wants to insert update history records into history table. + val updateDataFrame = if (trxMgr.getUpdateTrx != -1) { + // Get the insertHistoryAction related to update action. + val insertHist = mergeMatches.matchList.filter { f => + f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) && + f.getActions.exists(_.isInstanceOf[UpdateAction]) + }.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head. + asInstanceOf[InsertInHistoryTableAction] + // Create the dataframe to fetch history updated records. + Some(createHistoryDataFrame(sparkSession, rltn, carbonTable, insertHist, + histDataFrame, trxMgr.getUpdateTrx)) + } else { + None + } + // check if the user wants to insert delete history records into history table. + val delDataFrame = if (trxMgr.getDeleteTrx != -1) { + val insertHist = mergeMatches.matchList.filter { f => + f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) && + f.getActions.exists(_.isInstanceOf[DeleteAction]) + }.head.getActions.filter(_.isInstanceOf[InsertInHistoryTableAction]).head. + asInstanceOf[InsertInHistoryTableAction] + Some(createHistoryDataFrame(sparkSession, rltn, carbonTable, insertHist, + histDataFrame: Dataset[Row], trxMgr.getDeleteTrx)) + } else { + None + } + + val unionDf = (updateDataFrame, delDataFrame) match { + case (Some(u), Some(d)) => u.union(d) + case (Some(u), None) => u + case (None, Some(d)) => d + case _ => throw new CarbonMergeDataSetException("Some thing is wrong") + } + + val alias = carbonTable.getTableName + System.currentTimeMillis() + unionDf.createOrReplaceTempView(alias) + val start = System.currentTimeMillis() + sparkSession.sql(s"insert into ${ insert.historyTable.quotedString } " + + s"select * from ${ alias }") + LOGGER.info("Time taken to insert into history table " + (System.currentTimeMillis() - start)) + } + } + + /** + * It creates the dataframe to fetch deleted/updated records in the particular transaction. + */ + private def createHistoryDataFrame(sparkSession: SparkSession, + rltn: CarbonDatasourceHadoopRelation, + carbonTable: CarbonTable, + insertHist: InsertInHistoryTableAction, + histDataFrame: Dataset[Row], + factTimestamp: Long) = { + val rdd1 = new CarbonDeltaRowScanRDD[Row](sparkSession, + carbonTable.getTableInfo.serialize(), + carbonTable.getTableInfo, + null, + new CarbonProjection( + carbonTable.getCreateOrderColumn().asScala.map(_.getColName).toArray), + null, + carbonTable.getAbsoluteTableIdentifier, + new CarbonInputMetrics, + classOf[DataTypeConverterImpl], + classOf[SparkGenericRowReadSupportImpl], + factTimestamp.toString) + + val frame1 = sparkSession.createDataFrame(rdd1, rltn.carbonRelation.schema) + val histOutput = histDataFrame.queryExecution.analyzed.output + val cols = histOutput.map { a => + insertHist.insertMap.find(p => p._1.toString().equalsIgnoreCase(a.name)) match { + case Some((k, v)) => v + case _ => + throw new CarbonMergeDataSetException( + " All columns of history table are mapped in " + insertHist) + } + } + frame1.select(cols: _*) + } + +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala new file mode 100644 index 0000000..2525dcd --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeDataSetBuilder.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, Column, Dataset, Row, SparkSession} +import org.apache.spark.sql.functions.expr + +/** + * Builder class to generate and execute merge + */ +class MergeDataSetBuilder(existingDsOri: Dataset[Row], currDs: Dataset[Row], + joinExpr: Column, sparkSession: SparkSession) { + + def this(existingDsOri: Dataset[Row], currDs: Dataset[Row], + joinExpr: String, sparkSession: SparkSession) { + this(existingDsOri, currDs, expr(joinExpr), sparkSession) + } + + val matchList: util.List[MergeMatch] = new util.ArrayList[MergeMatch]() + + def whenMatched(): MergeDataSetBuilder = { + matchList.add(WhenMatched()) + this + } + + def whenMatched(expression: String): MergeDataSetBuilder = { + matchList.add(WhenMatched(Some(expr(expression)))) + this + } + + def whenMatched(expression: Column): MergeDataSetBuilder = { + matchList.add(WhenMatched(Some(expression))) + this + } + + def whenNotMatched(): MergeDataSetBuilder = { + matchList.add(WhenNotMatched()) + this + } + + def whenNotMatched(expression: String): MergeDataSetBuilder = { + matchList.add(WhenNotMatched(Some(expr(expression)))) + this + } + + def whenNotMatched(expression: Column): MergeDataSetBuilder = { + matchList.add(WhenNotMatched(Some(expression))) + this + } + + def whenNotMatchedAndExistsOnlyOnTarget(): MergeDataSetBuilder = { + matchList.add(WhenNotMatchedAndExistsOnlyOnTarget()) + this + } + + def whenNotMatchedAndExistsOnlyOnTarget(expression: String): MergeDataSetBuilder = { + matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expr(expression)))) + this + } + + def whenNotMatchedAndExistsOnlyOnTarget(expression: Column): MergeDataSetBuilder = { + matchList.add(WhenNotMatchedAndExistsOnlyOnTarget(Some(expression))) + this + } + + def updateExpr(expression: Map[Any, Any]): MergeDataSetBuilder = { + checkBuilder + matchList.get(matchList.size() - 1).addAction(UpdateAction(convertMap(expression))) + this + } + + def insertExpr(expression: Map[Any, Any]): MergeDataSetBuilder = { + checkBuilder + matchList.get(matchList.size() - 1).addAction(InsertAction(convertMap(expression))) + this + } + + def delete(): MergeDataSetBuilder = { + checkBuilder + matchList.get(matchList.size() - 1).addAction(DeleteAction()) + this + } + + def build(): CarbonMergeDataSetCommand = { + checkBuilder + CarbonMergeDataSetCommand(existingDsOri, currDs, + MergeDataSetMatches(joinExpr, matchList.asScala.toList)) + } + + def execute(): Unit = { + build().run(sparkSession) + } + + private def convertMap(exprMap: Map[Any, Any]): Map[Column, Column] = { + if (exprMap.exists{ case (k, v) => + !(checkType(k) && checkType(v)) + }) { + throw new AnalysisException( + "Expression map should only contain either String or Column " + exprMap) + } + def checkType(obj: Any) = obj.isInstanceOf[String] || obj.isInstanceOf[Column] + def convert(obj: Any) = + if (obj.isInstanceOf[Column]) obj.asInstanceOf[Column] else expr(obj.toString) + exprMap.map{ case (k, v) => + (convert(k), convert(v)) + } + } + + private def checkBuilder(): Unit = { + if (matchList.size() == 0) { + throw new AnalysisException("Atleast one matcher should be called before calling an action") + } + } + +} + diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala new file mode 100644 index 0000000..1245bd4 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MergeProjection.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, GenericRowWithSchema, InterpretedMutableProjection, Projection} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{DateType, TimestampType} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +/** + * Creates the projection for each action like update,delete or insert. + */ +case class MergeProjection( + @transient tableCols: Seq[String], + @transient ds: Dataset[Row], + @transient rltn: CarbonDatasourceHadoopRelation, + @transient sparkSession: SparkSession, + @transient mergeAction: MergeAction) { + + private val cutOffDate = Integer.MAX_VALUE >> 1 + + val isUpdate = mergeAction.isInstanceOf[UpdateAction] + val isDelete = mergeAction.isInstanceOf[DeleteAction] + + def apply(row: GenericRowWithSchema): Array[Object] = { + // TODO we can avoid these multiple conversions if this is added as a SparkPlan node. + val values = row.toSeq.map { + case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s) + case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d) + case b: Array[Byte] => org.apache.spark.unsafe.types.UTF8String.fromBytes(b) + case d: Date => DateTimeUtils.fromJavaDate(d) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + case value => value + } + + val outputRow = projection(new GenericInternalRow(values.toArray)) + .asInstanceOf[GenericInternalRow] + + val array = outputRow.values.clone() + var i = 0 + while (i < array.length) { + output(i).dataType match { + case d: DateType => + if (array(i) == null) { + array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL + } else { + array(i) = (array(i).asInstanceOf[Int] + cutOffDate) + } + case d: TimestampType => + if (array(i) == null) { + array(i) = CarbonCommonConstants.DIRECT_DICT_VALUE_NULL + } else { + array(i) = (array(i).asInstanceOf[Long] / 1000) + } + + case _ => + } + i += 1 + } + array.asInstanceOf[Array[Object]] + } + + val (projection, output) = generateProjection + + private def generateProjection: (Projection, Array[Expression]) = { + val existingDsOutput = rltn.carbonRelation.schema.toAttributes + val colsMap = mergeAction match { + case UpdateAction(updateMap) => updateMap + case InsertAction(insertMap) => insertMap + case _ => null + } + if (colsMap != null) { + val output = new Array[Expression](tableCols.length) + val expecOutput = new Array[Expression](tableCols.length) + colsMap.foreach { case (k, v) => + val tableIndex = tableCols.indexOf(k.toString().toLowerCase) + if (tableIndex < 0) { + throw new CarbonMergeDataSetException(s"Mapping is wrong $colsMap") + } + output(tableIndex) = v.expr.transform { + case a: Attribute if !a.resolved => + ds.queryExecution.analyzed.resolveQuoted(a.name, + sparkSession.sessionState.analyzer.resolver).get + } + expecOutput(tableIndex) = + existingDsOutput.find(_.name.equalsIgnoreCase(tableCols(tableIndex))).get + } + if (output.contains(null)) { + throw new CarbonMergeDataSetException(s"Not all columns are mapped") + } + (new InterpretedMutableProjection(output, ds.queryExecution.analyzed.output), expecOutput) + } else { + (null, null) + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala new file mode 100644 index 0000000..7410686 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/MutationAction.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.execution.command.mutation.DeleteExecution + +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, SegmentUpdateDetails} +import org.apache.carbondata.processing.loading.FailureCauses + +/** + * It apply the mutations like update and delete delta on to the store. + */ +abstract class MutationAction(sparkSession: SparkSession, carbonTable: CarbonTable) { + + /** + * The RDD of tupleids and delta status will be processed here to write the delta on store + */ + def handleAction(dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) + + protected def handle(sparkSession: SparkSession, + carbonTable: CarbonTable, + factTimestamp: Long, + dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + condition: (Int) => Boolean): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + val update = dataRDD.filter { row => + val status = row.get(1) + status != null && condition(status.asInstanceOf[Int]) + } + val tuple1 = DeleteExecution.deleteDeltaExecutionInternal(Some(carbonTable.getDatabaseName), + carbonTable.getTableName, + sparkSession, update, + factTimestamp.toString, + true, executorErrors, Some(0)) + MutationActionFactory.checkErrors(executorErrors) + val tupleProcessed1 = DeleteExecution.processSegments(executorErrors, tuple1._1, carbonTable, + factTimestamp.toString, tuple1._2) + MutationActionFactory.checkErrors(executorErrors) + tupleProcessed1 + } + +} + +/** + * It apply the update delta records to store in one transaction + */ +case class HandleUpdateAction(sparkSession: SparkSession, carbonTable: CarbonTable) + extends MutationAction(sparkSession, carbonTable) { + + override def handleAction(dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this), + dataRDD, executorErrors, (status) => (status == 101) || (status == 102)) + } +} + +/** + * It apply the delete delta records to store in one transaction + */ +case class HandleDeleteAction(sparkSession: SparkSession, carbonTable: CarbonTable) + extends MutationAction(sparkSession, carbonTable) { + + override def handleAction(dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this), + dataRDD, executorErrors, (status) => (status == 100) || (status == 102)) + } +} + +/** + * It apply the multiple mutations of delta records to store in multiple transactions. + */ +case class MultipleMutationAction(sparkSession: SparkSession, + carbonTable: CarbonTable, + mutations: Seq[MutationAction]) + extends MutationAction(sparkSession, carbonTable) { + + override def handleAction(dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + var (updates: util.List[SegmentUpdateDetails], segs: Seq[Segment]) = + (new util.ArrayList[SegmentUpdateDetails], Seq.empty[Segment]) + mutations.foreach { m => + val (l, r) = m.handleAction(dataRDD, executorErrors, trxMgr) + l.asScala.foreach { entry => + CarbonUpdateUtil.mergeSegmentUpdate(false, updates, entry) + } + segs ++= r + } + (updates, segs.distinct) + } +} + +/** + * It apply the delete and update delta records to store in a single transaction + */ +case class HandleUpdateAndDeleteAction(sparkSession: SparkSession, carbonTable: CarbonTable) + extends MutationAction(sparkSession, carbonTable) { + + override def handleAction(dataRDD: RDD[Row], + executorErrors: ExecutionErrors, + trxMgr: TranxManager): (util.List[SegmentUpdateDetails], Seq[Segment]) = { + handle(sparkSession, carbonTable, trxMgr.getNextTransaction(this), + dataRDD, executorErrors, (status) => (status == 100) || (status == 101) || (status == 102)) + } +} + +object MutationActionFactory { + + /** + * It is a factory method to generate a respective mutation action for update and delete. + */ + def getMutationAction(sparkSession: SparkSession, + carbonTable: CarbonTable, + hasDelAction: Boolean, + hasUpAction: Boolean, + hasInsrtHistUpd: Boolean, + hasInsrtHistDel: Boolean): MutationAction = { + var actions = Seq.empty[MutationAction] + // If the merge has history insert action then write the delete delta in two separate actions. + // As it is needed to know which are deleted records and which are insert records. + if (hasInsrtHistDel || hasInsrtHistUpd) { + if (hasUpAction) { + actions ++= Seq(HandleUpdateAction(sparkSession, carbonTable)) + } + if (hasDelAction) { + actions ++= Seq(HandleDeleteAction(sparkSession, carbonTable)) + } + } else { + // If there is no history insert action then apply it in single flow. + actions ++= Seq(HandleUpdateAndDeleteAction(sparkSession, carbonTable)) + } + if (actions.length == 1) { + actions.head + } else { + // If it has multiple actions to apply then combine to multi action. + MultipleMutationAction(sparkSession, carbonTable, actions) + } + } + + def checkErrors(executorErrors: ExecutionErrors): Unit = { + // Check for any failures occured during delete delta execution + if (executorErrors.failureCauses != FailureCauses.NONE) { + throw new CarbonMergeDataSetException(executorErrors.errorMsg) + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala new file mode 100644 index 0000000..8b5e4e8 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/TranxManager.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +/** + * It manages the transaction number for update or delete operations. Since we are applying update + * and delete delta records in a separate transactions it is required to keep track of transaction + * numbers. + */ +case class TranxManager(factTimestamp: Long) { + + private var newFactTimestamp: Long = factTimestamp + private var mutationMap = Map.empty[MutationAction, Long] + + def getNextTransaction(mutationAction: MutationAction): Long = { + if (mutationMap.isEmpty) { + mutationMap ++= Map[MutationAction, Long]((mutationAction, newFactTimestamp)) + } else { + if (mutationMap.get(mutationAction).isDefined) { + return mutationMap(mutationAction) + } else { + newFactTimestamp = newFactTimestamp + 1 + mutationMap ++= Map[MutationAction, Long]((mutationAction, newFactTimestamp)) + } + } + newFactTimestamp + } + + def getLatestTrx: Long = newFactTimestamp + + def getUpdateTrx: Long = { + val map = mutationMap.filter(_._1.isInstanceOf[HandleUpdateAction]) + if (map.isEmpty) { + -1 + } else { + map.head._2 + } + } + + def getDeleteTrx: Long = { + val map = mutationMap.filter(_._1.isInstanceOf[HandleDeleteAction]) + if (map.isEmpty) { + -1 + } else { + map.head._2 + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala new file mode 100644 index 0000000..91f0322 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/interfaces.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.mutation.merge + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.util.LongAccumulator + +/** + * It describes the type of match like whenmatched or whennotmatched etc., it holds all the actions + * to be done when this match passes. + */ +abstract class MergeMatch extends Serializable { + + var list: ArrayBuffer[MergeAction] = new ArrayBuffer[MergeAction]() + + def getExp: Option[Column] + + def addAction(action: MergeAction): MergeMatch = { + list += action + this + } + + def getActions: List[MergeAction] = { + list.toList + } + + def updateActions(actions: List[MergeAction]): MergeMatch = { + list = new ArrayBuffer[MergeAction]() + list ++= actions + this + } +} + +/** + * It describes the type of action like update,delete or insert + */ +trait MergeAction extends Serializable + +/** + * It is the holder to keep all the matches and join condition. + */ +case class MergeDataSetMatches(joinExpr: Column, matchList: List[MergeMatch]) extends Serializable + +case class WhenMatched(expression: Option[Column] = None) extends MergeMatch { + override def getExp: Option[Column] = expression +} + +case class WhenNotMatched(expression: Option[Column] = None) extends MergeMatch { + override def getExp: Option[Column] = expression +} + +case class WhenNotMatchedAndExistsOnlyOnTarget(expression: Option[Column] = None) + extends MergeMatch { + override def getExp: Option[Column] = expression +} + +case class UpdateAction(updateMap: Map[Column, Column]) extends MergeAction + +case class InsertAction(insertMap: Map[Column, Column]) extends MergeAction + +/** + * It inserts the history data into history table + */ +case class InsertInHistoryTableAction(insertMap: Map[Column, Column], historyTable: TableIdentifier) + extends MergeAction + +case class DeleteAction() extends MergeAction + +case class Stats(insertedRows: LongAccumulator, + updatedRows: LongAccumulator, + deletedRows: LongAccumulator)