Repository: carbondata Updated Branches: refs/heads/master 577a8b0d5 -> 9659edccb
[CARBONDATA-1863][PARTITION] Supported clean files for partition table. This closes #1706 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9659edcc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9659edcc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9659edcc Branch: refs/heads/master Commit: 9659edccbd05e9f5499911e6d049d9b9e1cf8c3a Parents: 577a8b0 Author: ravipesala <[email protected]> Authored: Thu Dec 21 10:23:27 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Dec 21 21:50:42 2017 +0530 ---------------------------------------------------------------------- .../blockletindex/SegmentIndexFileStore.java | 2 +- .../core/metadata/PartitionMapFileStore.java | 147 ++++++++++++++-- .../core/writer/CarbonIndexFileMergeWriter.java | 41 +++-- .../StandardPartitionTableCleanTestCase.scala | 167 +++++++++++++++++++ .../StandardPartitionTableDropTestCase.scala | 2 +- .../org/apache/carbondata/api/CarbonStore.scala | 10 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 29 ++-- .../management/CarbonCleanFilesCommand.scala | 15 +- .../management/CarbonLoadDataCommand.scala | 37 ++-- .../datasources/CarbonFileFormat.scala | 9 +- .../loading/model/CarbonLoadModel.java | 6 +- .../processing/util/CarbonLoaderUtil.java | 3 + 12 files changed, 414 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 244e8bb..01cb1d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -42,7 +42,7 @@ public class SegmentIndexFileStore { */ private Map<String, byte[]> carbonIndexMap; - public SegmentIndexFileStore() throws IOException { + public SegmentIndexFileStore() { carbonIndexMap = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index b44f99b..d29dfbb 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; @@ -38,8 +39,18 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import com.google.gson.Gson; @@ -134,6 +145,31 @@ public class PartitionMapFileStore { } } + private String getPartitionFilePath(String segmentPath) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + if (carbonFile.exists()) { + CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT); + } + }); + if (partitionFiles != null && partitionFiles.length > 0) { + partionedSegment = true; + int i = 0; + // Get the latest partition map file based on the timestamp of that file. + long[] partitionTimestamps = new long[partitionFiles.length]; + for (CarbonFile file : partitionFiles) { + partitionTimestamps[i++] = Long.parseLong(file.getName() + .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); + } + Arrays.sort(partitionTimestamps); + return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1] + + CarbonTablePath.PARTITION_MAP_EXT; + } + } + return null; + } + private CarbonFile[] getPartitionFiles(String segmentPath) { CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); if (carbonFile.exists()) { @@ -179,22 +215,15 @@ public class PartitionMapFileStore { return partitionMapper; } + /** + * Reads all partitions which existed inside the passed segment path + * @param segmentPath + */ public void readAllPartitionsOfSegment(String segmentPath) { - CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); - if (partitionFiles != null && partitionFiles.length > 0) { + String partitionFilePath = getPartitionFilePath(segmentPath); + if (partitionFilePath != null) { partionedSegment = true; - int i = 0; - // Get the latest partition map file based on the timestamp of that file. - long [] partitionTimestamps = new long[partitionFiles.length]; - for (CarbonFile file : partitionFiles) { - partitionTimestamps[i++] = - Long.parseLong(file.getName().substring( - 0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); - } - Arrays.sort(partitionTimestamps); - PartitionMapper partitionMapper = readPartitionMap( - segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1] - + CarbonTablePath.PARTITION_MAP_EXT); + PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); partitionMap.putAll(partitionMapper.getPartitionMap()); } } @@ -253,6 +282,96 @@ public class PartitionMapFileStore { } } + /** + * Clean up invalid data after drop partition in all segments of table + * @param table + * @param currentPartitions Current partitions of table + * @param forceDelete Whether it should be deleted force or check the time for an hour creation + * to delete data. + * @throws IOException + */ + public void cleanSegments( + CarbonTable table, + List<String> currentPartitions, + boolean forceDelete) throws IOException { + SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(), + table.getAbsoluteTableIdentifier().getCarbonTableIdentifier()); + + LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath()); + // scan through each segment. + + for (LoadMetadataDetails segment : details) { + + // if this segment is valid then only we will go for deletion of related + // dropped partition files. if the segment is mark for delete or compacted then any way + // it will get deleted. + + if (segment.getSegmentStatus() == SegmentStatus.SUCCESS + || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + List<String> toBeDeletedIndexFiles = new ArrayList<>(); + List<String> toBeDeletedDataFiles = new ArrayList<>(); + // take the list of files from this segment. + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); + String partitionFilePath = getPartitionFilePath(segmentPath); + if (partitionFilePath != null) { + PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFileStore.readAllIIndexOfSegment(segmentPath); + Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet(); + for (String indexFile : indexFilesFromSegment) { + // Check the partition information in the partiton mapper + List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile); + if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) { + Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile + .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, + indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length())); + if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) { + toBeDeletedIndexFiles.add(indexFile); + // Add the corresponding carbondata files to the delete list. + byte[] fileData = indexFileStore.getFileData(indexFile); + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData); + for (DataFileFooter footer : indexInfo) { + toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath()); + } + } + } + } + + if (toBeDeletedIndexFiles.size() > 0) { + indexFilesFromSegment.removeAll(toBeDeletedIndexFiles); + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath, + new ArrayList<String>(indexFilesFromSegment)); + for (String dataFile : toBeDeletedDataFiles) { + FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile)); + } + } + CarbonFile[] partitionFiles = getPartitionFiles(segmentPath); + CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath); + if (partitionFiles != null) { + // Delete all old partition files + for (CarbonFile partitionFile : partitionFiles) { + if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) { + partitionFile.delete(); + } + } + } + partitionMapper = readPartitionMap(partitionFilePath); + if (partitionMapper != null) { + // delete partition map if there is no partition files exist + if (partitionMapper.partitionMap.size() == 0) { + currentPartitionFile.delete(); + } + } + } + } + } + } + public List<String> getPartitions(String indexFileName) { return partitionMap.get(indexFileName); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index 85f08cc..067d024 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -39,36 +39,53 @@ public class CarbonIndexFileMergeWriter { /** * Merge all the carbonindex files of segment to a merged file * @param segmentPath + * @param indexFileNamesTobeAdded while merging it comsiders only these files. + * If null then consider all * @throws IOException */ - public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException { + public void mergeCarbonIndexFilesOfSegment( + String segmentPath, + List<String> indexFileNamesTobeAdded) throws IOException { CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); - if (isCarbonIndexFilePresent(indexFiles)) { + if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); fileStore.readAllIIndexOfSegment(segmentPath); - openThriftWriter( - segmentPath + "/" + - System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT); Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap(); MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader(); MergedBlockIndex mergedBlockIndex = new MergedBlockIndex(); List<String> fileNames = new ArrayList<>(indexMap.size()); List<ByteBuffer> data = new ArrayList<>(indexMap.size()); for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) { - fileNames.add(entry.getKey()); - data.add(ByteBuffer.wrap(entry.getValue())); + if (indexFileNamesTobeAdded == null || + indexFileNamesTobeAdded.contains(entry.getKey())) { + fileNames.add(entry.getKey()); + data.add(ByteBuffer.wrap(entry.getValue())); + } + } + if (fileNames.size() > 0) { + openThriftWriter( + segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT); + indexHeader.setFile_names(fileNames); + mergedBlockIndex.setFileData(data); + writeMergedBlockIndexHeader(indexHeader); + writeMergedBlockIndex(mergedBlockIndex); + close(); } - indexHeader.setFile_names(fileNames); - mergedBlockIndex.setFileData(data); - writeMergedBlockIndexHeader(indexHeader); - writeMergedBlockIndex(mergedBlockIndex); - close(); for (CarbonFile indexFile : indexFiles) { indexFile.delete(); } } } + /** + * Merge all the carbonindex files of segment to a merged file + * @param segmentPath + * @throws IOException + */ + public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException { + mergeCarbonIndexFilesOfSegment(segmentPath, null); + } + private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) { for (CarbonFile file : indexFiles) { if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala new file mode 100644 index 0000000..2b0dd09 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.standardpartition + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + dropTable + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + } + + def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int, partitionMapFiles: Int): Unit = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".carbondata") + } + }) + assert(dataFiles.length == partitions) + val partitionFile = carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".partitionmap") + } + }) + assert(partitionFile.length == partitionMapFiles) + } + + test("clean up partition table for int partition column") { + sql( + """ + | CREATE TABLE partitionone (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer( + sql(s"""select count (*) from partitionone"""), + sql(s"""select count (*) from originTable""")) + + checkAnswer( + sql(s"""select count (*) from partitionone where empno=11"""), + sql(s"""select count (*) from originTable where empno=11""")) + + sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""") + validateDataFiles("default_partitionone", "0", 10, 2) + sql(s"CLEAN FILES FOR TABLE partitionone").show() + + checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11") + validateDataFiles("default_partitionone", "0", 9, 1) + checkAnswer( + sql(s"""select count (*) from partitionone where empno=11"""), + Seq(Row(0))) + + } + + test("clean up partition on table for more partition columns") { + sql( + """ + | CREATE TABLE partitionmany (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""") + validateDataFiles("default_partitionmany", "0", 10, 2) + validateDataFiles("default_partitionmany", "1", 10, 2) + sql(s"CLEAN FILES FOR TABLE partitionmany").show() + validateDataFiles("default_partitionmany", "0", 8, 1) + validateDataFiles("default_partitionmany", "1", 8, 1) + checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479") + checkAnswer( + sql(s"""select count (*) from partitionmany where deptname='Learning'"""), + Seq(Row(0))) + } + + test("clean up after dropping all partition on table") { + sql( + """ + | CREATE TABLE partitionall (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='Learning')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='configManagement')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='network')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""") + sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""") + assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0) + validateDataFiles("default_partitionall", "0", 10, 6) + sql(s"CLEAN FILES FOR TABLE partitionall").show() + validateDataFiles("default_partitionall", "0", 0, 0) + checkAnswer( + sql(s"""select count (*) from partitionall"""), + Seq(Row(0))) + } + + override def afterAll = { + dropTable + } + + def dropTable = { + sql("drop table if exists originTable") + sql("drop table if exists originMultiLoads") + sql("drop table if exists partitionone") + sql("drop table if exists partitionall") + sql("drop table if exists partitionmany") + sql("drop table if exists partitionshow") + sql("drop table if exists staticpartition") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala index 2a25255..9a9940b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -156,7 +156,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl } override def afterAll = { -// dropTable + dropTable } def dropTable = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 2b127e4..d514f77 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -30,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMapFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager @@ -103,7 +103,8 @@ object CarbonStore { tableName: String, storePath: String, carbonTable: CarbonTable, - forceTableClean: Boolean): Unit = { + forceTableClean: Boolean, + currentTablePartitions: Option[Seq[String]] = None): Unit = { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null var absoluteTableIdentifier: AbsoluteTableIdentifier = null @@ -128,6 +129,11 @@ object CarbonStore { DataLoadingUtil.deleteLoadsAndUpdateMetadata( isForceDeletion = true, carbonTable) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) + currentTablePartitions match { + case Some(partitions) => + new PartitionMapFileStore().cleanSegments(carbonTable, partitions.asJava, true) + case _ => + } } } finally { if (carbonCleanFilesLock != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index f78412b..4c405a4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -157,16 +157,25 @@ object CarbonScalaUtil { def convertToUTF8String(value: String, dataType: DataType, timeStampFormat: SimpleDateFormat, - dateFormat: SimpleDateFormat): UTF8String = { - dataType match { - case TimestampType => - UTF8String.fromString( - DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000)) - case DateType => - UTF8String.fromString( - DateTimeUtils.dateToString( - (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt)) - case _ => UTF8String.fromString(value) + dateFormat: SimpleDateFormat, + serializationNullFormat: String): UTF8String = { + if (value == null || serializationNullFormat.equals(value)) { + return UTF8String.fromString(value) + } + try { + dataType match { + case TimestampType => + UTF8String.fromString( + DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000)) + case DateType => + UTF8String.fromString( + DateTimeUtils.dateToString( + (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt)) + case _ => UTF8String.fromString(value) + } + } catch { + case e: Exception => + UTF8String.fromString(value) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index e0530f6..342acd4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.command.management import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.core.util.CarbonProperties @@ -76,13 +79,21 @@ case class CarbonCleanFilesCommand( private def cleanGarbageData(sparkSession: SparkSession, databaseNameOp: Option[String], tableName: String): Unit = { val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - + val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) { + Some(CarbonFilters.getPartitions( + Seq.empty[Expression], + sparkSession, + TableIdentifier(tableName, databaseNameOp))) + } else { + None + } CarbonStore.cleanFiles( CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName, CarbonProperties.getStorePath, carbonTable, - forceTableClean) + forceTableClean, + partitions) } private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index f96c0a7..7285d9d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -34,14 +34,15 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} @@ -52,6 +53,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider +import org.apache.carbondata.core.metadata.PartitionMapFileStore import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension @@ -453,16 +455,28 @@ case class CarbonLoadDataCommand( hadoopConf: Configuration, dataFrame: Option[DataFrame]) = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) val logicalPlan = sparkSession.sessionState.catalog.lookupRelation( - TableIdentifier(table.getTableName, Some(table.getDatabaseName))) + identifier) val relation = logicalPlan.collect { case l: LogicalRelation => l - case c: CatalogRelation => c + case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes + if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || + c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || + c.getClass.getName.equals( + "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => c }.head - + // Clean up the old invalid segment data. + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) + val currentPartitions = + CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier) + // Clean up the alreday dropped partitioned data + new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false) // Converts the data to carbon understandable format. The timestamp/date format data needs to // converted to hive standard fomat to let spark understand the data to partition. + val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) val query: LogicalPlan = if (dataFrame.isDefined) { var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT val timeStampFormat = new SimpleDateFormat(timeStampformatString) @@ -545,13 +559,15 @@ case class CarbonLoadDataCommand( jobConf).map{ case (key, value) => val data = new Array[Any](len) var i = 0 - while (i < len) { + val input = value.get() + while (i < input.length) { // TODO find a way to avoid double conversion of date and time. data(i) = CarbonScalaUtil.convertToUTF8String( - value.get()(i), + input(i), rowDataTypes(i), timeStampFormat, - dateFormat) + dateFormat, + serializationNullFormat) i = i + 1 } InternalRow.fromSeq(data) @@ -576,7 +592,7 @@ case class CarbonLoadDataCommand( isOverwriteTable, carbonLoadModel, sparkSession) - case c: CatalogRelation => + case others => val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable( "tableMeta", relation).asInstanceOf[CatalogTable] @@ -587,7 +603,7 @@ case class CarbonLoadDataCommand( val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) convertToLogicalRelation( catalogTable, - c.output, + others.output, sizeInBytes, isOverwriteTable, carbonLoadModel, @@ -642,6 +658,7 @@ case class CarbonLoadDataCommand( options += (("onepass", loadModel.getUseOnePass.toString)) options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) + options ++= this.options if (updateModel.isDefined) { options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index a95693c..b9df7a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -189,7 +189,14 @@ private class CarbonOutputWriter(path: String, extends OutputWriter with AbstractCarbonOutputWriter { val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName) val partitionData = if (partitions.nonEmpty) { - partitions.map(_.split("=")(1)) + partitions.map{ p => + val splitData = p.split("=") + if (splitData.length > 1) { + splitData(1) + } else { + "" + } + } } else { Array.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 3031b8e..8a295d9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -564,7 +564,11 @@ public class CarbonLoadModel implements Serializable { * @return */ public LoadMetadataDetails getCurrentLoadMetadataDetail() { - return loadMetadataDetails.get(loadMetadataDetails.size() - 1); + if (loadMetadataDetails != null && loadMetadataDetails.size() > 0) { + return loadMetadataDetails.get(loadMetadataDetails.size() - 1); + } else { + return null; + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index fef6930..49ca254 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -393,6 +393,9 @@ public final class CarbonLoaderUtil { SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE; // always the last entry in the load metadata details will be the current load entry LoadMetadataDetails loadMetaEntry = model.getCurrentLoadMetadataDetail(); + if (loadMetaEntry == null) { + return; + } CarbonLoaderUtil .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true); boolean entryAdded =
