Repository: carbondata Updated Branches: refs/heads/master 970a32a26 -> 5e3aec43e
[CARBONDATA-1886] Check and Delete stale segment folders on new load segment folders are not getting deleted if corresponding entry is not available in table status file. Due to this query gives more record count than actual This closes #1646 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5e3aec43 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5e3aec43 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5e3aec43 Branch: refs/heads/master Commit: 5e3aec43ed65f48a63a396aac016e1ac06ef8589 Parents: 970a32a Author: kunal642 <[email protected]> Authored: Tue Dec 12 16:25:31 2017 +0530 Committer: manishgupta88 <[email protected]> Committed: Wed Dec 13 10:13:03 2017 +0530 ---------------------------------------------------------------------- .../spark/testsuite/dataload/TestLoadDataGeneral.scala | 13 ++++++++++++- .../command/management/CarbonLoadDataCommand.scala | 9 ++++++--- .../processing/loading/TableProcessingOperations.java | 3 +-- 3 files changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala index e3d497a..49f3c5e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala @@ -30,7 +30,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.spark.sql.test.util.QueryTest import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll { @@ -191,6 +190,18 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll { } + test("test if stale folders are deleting on data load") { + sql("drop table if exists stale") + sql("create table stale(a string) stored by 'carbondata'") + sql("insert into stale values('k')") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "stale") + val tableStatusFile = new CarbonTablePath(null, + carbonTable.getTablePath).getTableStatusFilePath + FileFactory.getCarbonFile(tableStatusFile).delete() + sql("insert into stale values('k')") + checkAnswer(sql("select * from stale"), Row("k")) + } + override def afterAll { sql("DROP TABLE if exists loadtest") sql("drop table if exists invalidMeasures") http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index ebdaa33..9f6fce1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} -import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel} +import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{CausedBy, FileUtils} @@ -42,9 +42,9 @@ import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format import org.apache.carbondata.processing.exception.DataLoadingException +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.merger.CompactionType import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel} import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} @@ -139,7 +139,10 @@ case class CarbonLoadDataCommand( carbonLoadModel, hadoopConf ) - + // Delete stale segment folders that are not in table status but are physically present in + // the Fact folder + LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") + TableProcessingOperations.deletePartialLoadDataIfExist(table, false) try { val operationContext = new OperationContext val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index cb53d6e..e2be79c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -74,8 +74,7 @@ public class TableProcessingOperations { CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); boolean found = false; for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount() - .equals(partitionCount)) { + if (details[j].getLoadName().equals(segmentId)) { found = true; break; }
