[CARBONDATA-2014] Update table status for failure only after first entry and IndexOutOfBound Fix
update table status for load failure only after first entry and before calling to update the table status for failure, check whether it is hive partition table in the same way as it is checked while updating in progress status to table status This closes #1769 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0adb32d9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0adb32d9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0adb32d9 Branch: refs/heads/fgdatamap Commit: 0adb32d900230ec3722faa3b06b569a5ce4d2729 Parents: 6123e7f Author: akashrn5 <[email protected]> Authored: Sat Jan 6 16:29:30 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Fri Jan 12 13:43:38 2018 +0530 ---------------------------------------------------------------------- .../command/management/CarbonLoadDataCommand.scala | 10 ++++++++-- .../carbondata/processing/util/CarbonLoaderUtil.java | 8 -------- 2 files changed, 8 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0adb32d9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 60adcd7..1828557 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -166,6 +166,7 @@ case class CarbonLoadDataCommand( // the Fact folder LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + var isUpdateTableStatusRequired = false try { val operationContext = new OperationContext val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = @@ -187,6 +188,7 @@ case class CarbonLoadDataCommand( // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) + isUpdateTableStatusRequired = true } if (isOverwriteTable) { LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") @@ -242,7 +244,9 @@ case class CarbonLoadDataCommand( } catch { case CausedBy(ex: NoRetryException) => // update the load entry in table status file for changing the status to marked for delete - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + } LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") // In case of event related exception @@ -251,7 +255,9 @@ case class CarbonLoadDataCommand( case ex: Exception => LOGGER.error(ex) // update the load entry in table status file for changing the status to marked for delete - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + } LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") throw ex } finally { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0adb32d9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index de59982..1991017 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -212,21 +212,13 @@ public final class CarbonLoaderUtil { // existing entry needs to be overwritten as the entry will exist with some // intermediate status int indexToOverwriteNewMetaEntry = 0; - Boolean anyLoadInProgress = false; for (LoadMetadataDetails entry : listOfLoadFolderDetails) { - if (entry.getSegmentStatus().equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { - anyLoadInProgress = true; - } if (entry.getLoadName().equals(newMetaEntry.getLoadName()) && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) { break; } indexToOverwriteNewMetaEntry++; } - if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getSegmentStatus() == - SegmentStatus.MARKED_FOR_DELETE && anyLoadInProgress) { - throw new RuntimeException("It seems insert overwrite has been issued during load"); - } if (insertOverwrite) { for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getSegmentStatus() != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
