Repository: carbondata Updated Branches: refs/heads/master 64b26513f -> 3ff55a2ee
[CARBONDATA-1859][CARBONDATA-1861][PARTITION] Support show and drop partitions This closes #1674 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3ff55a2e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3ff55a2e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3ff55a2e Branch: refs/heads/master Commit: 3ff55a2eef1e2bc0e61ee909b62d485e5240243a Parents: 64b2651 Author: ravipesala <[email protected]> Authored: Sat Dec 16 22:38:00 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Wed Dec 20 23:46:05 2017 +0530 ---------------------------------------------------------------------- .../indexstore/BlockletDataMapIndexStore.java | 15 +- .../blockletindex/BlockletDataMap.java | 26 ++- .../blockletindex/BlockletDataMapModel.java | 13 +- .../core/metadata/PartitionMapFileStore.java | 103 ++++++++++- .../hadoop/api/CarbonOutputCommitter.java | 11 +- .../hadoop/api/CarbonTableInputFormat.java | 7 +- .../hadoop/ft/CarbonOutputMapperTest.java | 5 +- .../StandardPartitionTableDropTestCase.scala | 172 +++++++++++++++++++ .../spark/rdd/CarbonDropPartitionRDD.scala | 140 +++++++++++++++ .../org/apache/spark/sql/CarbonCountStar.scala | 14 +- .../command/mutation/DeleteExecution.scala | 12 +- .../CarbonStandardAlterTableDropPartition.scala | 142 +++++++++++++++ .../table/CarbonCreateTableCommand.scala | 10 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../sql/execution/strategy/DDLStrategy.scala | 25 ++- .../iterator/CarbonOutputIteratorWrapper.java | 4 +- 16 files changed, 660 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 59af50b..f3e55e6 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -72,9 +72,9 @@ public class BlockletDataMapIndexStore BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey); if (dataMap == null) { try { - String segmentPath = CarbonTablePath - .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(), - identifier.getSegmentId()); + String segmentPath = CarbonTablePath.getSegmentPath( + identifier.getAbsoluteTableIdentifier().getTablePath(), + identifier.getSegmentId()); SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); indexFileStore.readAllIIndexOfSegment(segmentPath); PartitionMapFileStore partitionFileStore = new PartitionMapFileStore(); @@ -111,9 +111,9 @@ public class BlockletDataMapIndexStore segmentIndexFileStoreMap.get(identifier.getSegmentId()); PartitionMapFileStore partitionFileStore = partitionFileStoreMap.get(identifier.getSegmentId()); - String segmentPath = CarbonTablePath - .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(), - identifier.getSegmentId()); + String segmentPath = CarbonTablePath.getSegmentPath( + identifier.getAbsoluteTableIdentifier().getTablePath(), + identifier.getSegmentId()); if (indexFileStore == null) { indexFileStore = new SegmentIndexFileStore(); indexFileStore.readAllIIndexOfSegment(segmentPath); @@ -185,7 +185,8 @@ public class BlockletDataMapIndexStore dataMap = new BlockletDataMap(); dataMap.init(new BlockletDataMapModel(identifier.getFilePath(), indexFileStore.getFileData(identifier.getCarbonIndexFileName()), - partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()))); + partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()), + partitionFileStore.isPartionedSegment())); lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap, dataMap.getMemorySize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 70bae32..f1188cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -105,6 +105,8 @@ public class BlockletDataMap implements DataMap, Cacheable { private int[] columnCardinality; + private boolean isPartitionedSegment; + @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { long startTime = System.currentTimeMillis(); @@ -113,6 +115,7 @@ public class BlockletDataMap implements DataMap, Cacheable { DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); List<DataFileFooter> indexInfo = fileFooterConverter .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData()); + isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment(); DataMapRowImpl summaryRow = null; for (DataFileFooter fileFooter : indexInfo) { List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); @@ -394,6 +397,14 @@ public class BlockletDataMap implements DataMap, Cacheable { new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); } + /** + * Creates the schema to store summary information or the information which can be stored only + * once per datamap. It stores datamap level max/min of each column and partition information of + * datamap + * @param segmentProperties + * @param partitions + * @throws MemoryException + */ private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions) throws MemoryException { List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2); @@ -518,8 +529,15 @@ public class BlockletDataMap implements DataMap, Cacheable { } @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) { + // First get the partitions which are stored inside datamap. List<String> storedPartitions = getPartitions(); - if (storedPartitions != null && storedPartitions.size() > 0 && filterExp != null) { + // if it has partitioned datamap but there is no partitioned information stored, it means + // partitions are dropped so return empty list. + if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) { + return new ArrayList<>(); + } + if (storedPartitions != null && storedPartitions.size() > 0) { + // Check the exact match of partition information inside the stored partitions. boolean found = false; if (partitions != null && partitions.size() > 0) { found = partitions.containsAll(storedPartitions); @@ -528,6 +546,7 @@ public class BlockletDataMap implements DataMap, Cacheable { return new ArrayList<>(); } } + // Prune with filters if the partitions are existed in this datamap return prune(filterExp); } @@ -705,16 +724,17 @@ public class BlockletDataMap implements DataMap, Cacheable { } private List<String> getPartitions() { - List<String> partitions = new ArrayList<>(); DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0); if (unsafeRow.getColumnCount() > 2) { + List<String> partitions = new ArrayList<>(); DataMapRow row = unsafeRow.getRow(2); for (int i = 0; i < row.getColumnCount(); i++) { partitions.add( new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)); } + return partitions; } - return partitions; + return null; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index c98ef33..704e0f7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -20,16 +20,23 @@ import java.util.List; import org.apache.carbondata.core.datamap.dev.DataMapModel; +/** + * It is the model object to keep the information to build or initialize BlockletDataMap. + */ public class BlockletDataMapModel extends DataMapModel { private byte[] fileData; private List<String> partitions; - public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions) { + private boolean partitionedSegment; + + public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions, + boolean partitionedSegment) { super(filePath); this.fileData = fileData; this.partitions = partitions; + this.partitionedSegment = partitionedSegment; } public byte[] getFileData() { @@ -39,4 +46,8 @@ public class BlockletDataMapModel extends DataMapModel { public List<String> getPartitions() { return partitions; } + + public boolean isPartitionedSegment() { + return partitionedSegment; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index 853c729..8578cfe 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -25,6 +25,8 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,6 +49,8 @@ import com.google.gson.Gson; public class PartitionMapFileStore { private Map<String, List<String>> partitionMap = new HashMap<>(); + + private boolean partionedSegment = false; /** * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions. * @@ -105,9 +109,9 @@ public class PartitionMapFileStore { * @param segmentPath * @throws IOException */ - public void mergePartitionMapFiles(String segmentPath) throws IOException { + public void mergePartitionMapFiles(String segmentPath, String mergeFileName) throws IOException { CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); - if (partitionFiles != null && partitionFiles.length > 1) { + if (partitionFiles != null && partitionFiles.length > 0) { PartitionMapper partitionMapper = null; for (CarbonFile file : partitionFiles) { PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath()); @@ -119,10 +123,12 @@ public class PartitionMapFileStore { } } if (partitionMapper != null) { - String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT; + String path = segmentPath + "/" + mergeFileName + CarbonTablePath.PARTITION_MAP_EXT; writePartitionFile(partitionMapper, path); for (CarbonFile file : partitionFiles) { - FileFactory.deleteAllCarbonFilesOfDir(file); + if (!FileFactory.deleteAllCarbonFilesOfDir(file)) { + throw new IOException("Old partition map files cannot be deleted"); + } } } } @@ -146,7 +152,7 @@ public class PartitionMapFileStore { * @param partitionMapPath * @return */ - public PartitionMapper readPartitionMap(String partitionMapPath) { + private PartitionMapper readPartitionMap(String partitionMapPath) { Gson gsonObjectToRead = new Gson(); DataInputStream dataInputStream = null; BufferedReader buffReader = null; @@ -176,9 +182,92 @@ public class PartitionMapFileStore { public void readAllPartitionsOfSegment(String segmentPath) { CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); if (partitionFiles != null && partitionFiles.length > 0) { + partionedSegment = true; + int i = 0; + // Get the latest partition map file based on the timestamp of that file. + long [] partitionTimestamps = new long[partitionFiles.length]; for (CarbonFile file : partitionFiles) { - PartitionMapper partitionMapper = readPartitionMap(file.getAbsolutePath()); - partitionMap.putAll(partitionMapper.getPartitionMap()); + partitionTimestamps[i++] = + Long.parseLong(file.getName().substring( + 0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); + } + Arrays.sort(partitionTimestamps); + PartitionMapper partitionMapper = readPartitionMap( + segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1] + + CarbonTablePath.PARTITION_MAP_EXT); + partitionMap.putAll(partitionMapper.getPartitionMap()); + } + } + + public boolean isPartionedSegment() { + return partionedSegment; + } + + /** + * Drops the partitions from the partition mapper file of the segment and writes to a new file. + * @param segmentPath + * @param partitionsToDrop + * @param uniqueId + * @throws IOException + */ + public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId) + throws IOException { + readAllPartitionsOfSegment(segmentPath); + List<String> indexesToDrop = new ArrayList<>(); + for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) { + for (String partition: partitionsToDrop) { + if (entry.getValue().contains(partition)) { + indexesToDrop.add(entry.getKey()); + } + } + } + if (indexesToDrop.size() > 0) { + // Remove the indexes from partition map + for (String indexToDrop : indexesToDrop) { + partitionMap.remove(indexToDrop); + } + PartitionMapper mapper = new PartitionMapper(); + mapper.setPartitionMap(partitionMap); + String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT; + writePartitionFile(mapper, path); + } + } + + /** + * It deletes the old partition mapper files in case of success. And in case of failure it removes + * the old new file. + * @param segmentPath + * @param uniqueId + * @param success + */ + public void commitPartitions(String segmentPath, final String uniqueId, boolean success) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + // write partition info to new file. + if (carbonFile.exists()) { + CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT); + } + }); + CarbonFile latestFile = null; + for (CarbonFile mapFile: carbonFiles) { + if (mapFile.getName().startsWith(uniqueId)) { + latestFile = mapFile; + } + } + if (latestFile != null) { + for (CarbonFile mapFile : carbonFiles) { + if (latestFile != mapFile) { + // Remove old files in case of success scenario + if (success) { + mapFile.delete(); + } + } + } + } + // If it is failure scenario then remove the new file. + if (!success && latestFile != null) { + latestFile.delete(); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index ec42adf..08fd1ac 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.metadata.PartitionMapFileStore; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonLoaderUtil; @@ -75,16 +76,18 @@ public class CarbonOutputCommitter extends FileOutputCommitter { super.commitJob(context); boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + String segmentPath = + CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()); + // Merge all partition files into a single file. + new PartitionMapFileStore().mergePartitionMapFiles(segmentPath, + loadModel.getFactTimeStamp() + ""); LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail(); CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(), true); CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), loadModel.getCarbonDataLoadSchema().getCarbonTable()); CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); - String segmentPath = - CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()); - // Merge all partition files into a single file. - new PartitionMapFileStore().mergePartitionMapFiles(segmentPath); + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 7db9c0a..6a2349a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -42,7 +42,6 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; -import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -918,8 +917,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { * @throws IOException * @throws KeyGenException */ - public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier) - throws IOException, KeyGenException { + public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier, + List<String> partitions) throws IOException { TableDataMap blockletMap = DataMapStoreManager.getInstance() .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName()); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); @@ -931,7 +930,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // TODO: currently only batch segment is supported, add support for streaming table List<String> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments()); - List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, null); + List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions); for (ExtendedBlocklet blocklet : blocklets) { String blockName = blocklet.getPath(); blockName = CarbonTablePath.getCarbonDataFileName(blockName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java index 006ffd2..22a6e53 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java @@ -58,12 +58,13 @@ public class CarbonOutputMapperTest extends TestCase { assert (file.exists()); File[] listFiles = file.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - return name.endsWith(".carbondata") || name.endsWith(".carbonindex"); + return name.endsWith(".carbondata") || + name.endsWith(".carbonindex") || + name.endsWith(".carbonindexmerge"); } }); assert (listFiles.length == 2); - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala new file mode 100644 index 0000000..9a9940b --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.standardpartition + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + dropTable + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + } + + test("show partitions on partition table") { + sql( + """ + | CREATE TABLE partitionshow (designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int, empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionshow OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkExistence(sql(s"""SHOW PARTITIONS partitionshow"""), true, "empno=11", "empno=12") + } + + test("droping on partition table for int partition column") { + sql( + """ + | CREATE TABLE partitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer( + sql(s"""select count (*) from partitionone"""), + sql(s"""select count (*) from originTable""")) + + checkAnswer( + sql(s"""select count (*) from partitionone where empno=11"""), + sql(s"""select count (*) from originTable where empno=11""")) + + sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""") + + checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11") + + checkAnswer( + sql(s"""select count (*) from partitionone where empno=11"""), + Seq(Row(0))) + } + + test("dropping partition on table for more partition columns") { + sql( + """ + | CREATE TABLE partitionmany (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""") + checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479") + checkAnswer( + sql(s"""select count (*) from partitionmany where deptname='Learning'"""), + Seq(Row(0))) + } + + test("dropping all partition on table") { + sql( + """ + | CREATE TABLE partitionall (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='Learning')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='configManagement')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='network')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""") + assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0) + checkAnswer( + sql(s"""select count (*) from partitionall"""), + Seq(Row(0))) + } + + test("dropping static partition on table") { + sql( + """ + | CREATE TABLE staticpartition (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,workgroupcategory int, empname String, designation String) + | PARTITIONED BY (deptname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""") + + checkExistence(sql(s"""SHOW PARTITIONS staticpartition"""), true, "deptname=software") + assert(sql(s"""SHOW PARTITIONS staticpartition""").collect().length == 1) + sql(s"""ALTER TABLE staticpartition DROP PARTITION(deptname='software')""") + checkAnswer( + sql(s"""select count (*) from staticpartition"""), + Seq(Row(0))) + sql(s"""insert into staticpartition select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation,deptname from originTable""") + checkExistence(sql(s"""SHOW PARTITIONS staticpartition"""), true, "deptname=protocol") + checkAnswer( + sql(s"""select count (*) from staticpartition"""), + sql(s"""select count (*) from originTable""")) + + } + + override def afterAll = { + dropTable + } + + def dropTable = { + sql("drop table if exists originTable") + sql("drop table if exists originMultiLoads") + sql("drop table if exists partitionone") + sql("drop table if exists partitionall") + sql("drop table if exists partitionmany") + sql("drop table if exists partitionshow") + sql("drop table if exists staticpartition") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala new file mode 100644 index 0000000..09d9da1 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.metadata.PartitionMapFileStore +import org.apache.carbondata.core.util.path.CarbonTablePath + +case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * RDD to drop the partitions from partition mapper files of all segments. + * @param sc + * @param tablePath + * @param segments segments to be merged + */ +class CarbonDropPartitionRDD( + sc: SparkContext, + tablePath: String, + segments: Seq[String], + partitions: Seq[String], + uniqueId: String) + extends CarbonRDD[String](sc, Nil) { + + override def getPartitions: Array[Partition] = { + segments.zipWithIndex.map {s => + CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) + }.toArray + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + val iter = new Iterator[String] { + val split = theSplit.asInstanceOf[CarbonDropPartition] + logInfo("Dropping partition information from : " + split.segmentPath) + + new PartitionMapFileStore().dropPartitions( + split.segmentPath, + partitions.toList.asJava, + uniqueId) + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + "" + } + + } + iter + } + +} + +/** + * This RDD is used for committing the partitions which were removed in before step. It just removes + * old mapper files and related data files. + * @param sc + * @param tablePath + * @param segments segments to be merged + */ +class CarbonDropPartitionRollbackRDD( + sc: SparkContext, + tablePath: String, + segments: Seq[String], + uniqueId: String) + extends CarbonRDD[String](sc, Nil) { + + override def getPartitions: Array[Partition] = { + segments.zipWithIndex.map {s => + CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) + }.toArray + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + val iter = new Iterator[String] { + val split = theSplit.asInstanceOf[CarbonDropPartition] + logInfo("Commit partition information from : " + split.segmentPath) + + new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, false) + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + "" + } + + } + iter + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 0f745cc..833c6fe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf @@ -24,9 +26,10 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -36,6 +39,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat case class CarbonCountStar( attributesRaw: Seq[Attribute], carbonTable: CarbonTable, + sparkSession: SparkSession, outUnsafeRows: Boolean = true) extends LeafExecNode { override def doExecute(): RDD[InternalRow] = { @@ -45,7 +49,13 @@ case class CarbonCountStar( // get row count val rowCount = CarbonUpdateUtil.getRowCount( - tableInputFormat.getBlockRowCount(job, absoluteTableIdentifier), + tableInputFormat.getBlockRowCount( + job, + absoluteTableIdentifier, + CarbonFilters.getPartitions( + Seq.empty, + sparkSession, + TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))).asJava), absoluteTableIdentifier) val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]) val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index b043698..d2e8789 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -92,8 +94,14 @@ object DeleteExecution { if (keyRdd.partitions.length == 0) { return true } - - val blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier) + val blockMappingVO = + carbonInputFormat.getBlockRowCount( + job, + absoluteTableIdentifier, + CarbonFilters.getPartitions( + Seq.empty, + sparkSession, + TableIdentifier(tableName, databaseNameOp)).asJava) val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) CarbonUpdateUtil .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala new file mode 100644 index 0000000..b787aa7 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.partition + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.spark.rdd.{CarbonDropPartitionRDD, CarbonDropPartitionRollbackRDD} + +/** + * Drop the partitions from hive and carbon store. It drops the partitions in following steps + * 1. Drop the partitions from carbon store, it just create one new mapper file in each segment + * with uniqueid. + * 2. Drop partitions from hive. + * 3. In any above step fails then roll back the newly created files + * 4. After success of steps 1 and 2 , it commits the files by removing the old fails. + * Here it does not remove any data from store. During compaction the old data won't be considered. + * @param tableName + * @param specs + * @param ifExists + * @param purge + * @param retainData + */ +case class CarbonStandardAlterTableDropPartition( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean) + extends AtomicRunnableCommand { + + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + if (table.isHivePartitionTable) { + try { + specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) + } catch { + case e: Exception => + if (!ifExists) { + throw e + } else { + log.warn(e.getMessage) + return Seq.empty[Row] + } + } + + // Drop the partitions from hive. + AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) + .run(sparkSession) + } + Seq.empty[Row] + } + + + override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { + AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists) + val msg = s"Got exception $exception when processing data of drop partition." + + "Adding back partitions to the metadata" + LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) + Seq.empty[Row] + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + var locks = List.empty[ICarbonLock] + val uniqueId = System.currentTimeMillis().toString + try { + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + locks = AlterTableUtil.validateTableAndAcquireLock( + table.getDatabaseName, + table.getTableName, + locksToBeAcquired)(sparkSession) + val partitionNames = specs.flatMap { f => + f.map(k => k._1 + "=" + k._2) + }.toSet + val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier) + .getValidAndInvalidSegments.getValidSegments + try { + // First drop the partitions from partition mapper files of each segment + new CarbonDropPartitionRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + partitionNames.toSeq, + uniqueId).collect() + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionRollbackRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + uniqueId).collect() + throw e + } + val segmentSet = new util.HashSet[String](new SegmentStatusManager(table + .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) + CarbonUpdateUtil + .updateTableMetadataStatus(segmentSet, + table, + uniqueId, + true, + new util.ArrayList[String]) + DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) + } finally { + AlterTableUtil.releaseLocks(locks) + } + Seq.empty[Row] + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index f86b35f..dac08e0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -98,11 +98,11 @@ case class CarbonCreateTableCommand( val partitionString = if (partitionInfo != null && partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { - s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map( - _.getColumnName).mkString(",")})" - } else { - "" - } + s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map( + _.getColumnName).mkString(",")})" + } else { + "" + } sparkSession.sql( s"""CREATE TABLE $dbName.$tableName |(${ rawSchema }) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 1e71714..ad519e6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -79,7 +79,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { case CountStarPlan(colAttr, PhysicalOperation(projectList, predicates, l: LogicalRelation)) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) => val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - CarbonCountStar(colAttr, relation.carbonTable) :: Nil + CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 21b81ce..eeadbf6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -23,12 +23,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand} -import org.apache.spark.sql.execution.command.partition.CarbonShowCarbonPartitionsCommand +import org.apache.spark.sql.execution.command.partition.{CarbonShowCarbonPartitionsCommand, CarbonStandardAlterTableDropPartition} import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} import org.apache.spark.sql.execution.datasources.RefreshTable +import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -158,10 +159,30 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(t)(sparkSession) if (isCarbonTable) { - ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + if (!carbonTable.isHivePartitionTable) { + ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil + } else { + ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil + } } else { ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil } + case adp@AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) => + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableName)(sparkSession) + if (isCarbonTable) { + ExecutedCommandExec( + CarbonStandardAlterTableDropPartition( + tableName, + specs, + ifExists, + purge, + retainData)) :: Nil + } else { + ExecutedCommandExec(adp) :: Nil + } case set@SetCommand(kv) => ExecutedCommandExec(CarbonSetCommand(set)) :: Nil case reset@ResetCommand => http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index 0cd1331..08d1497 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -70,7 +70,9 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> { if (loadBatch.isLoading()) { try { loadBatch.readyRead(); - queue.put(loadBatch); + if (loadBatch.size > 0) { + queue.put(loadBatch); + } } catch (InterruptedException e) { throw new RuntimeException(e); }
