[CARBONDATA-1380] Fixed updation of load fail to table status file to make overwrite queries work properly
Tablestatus file is not updated when load fails. It should be updated with fail status otherwise overwrite queries does not work properly. This closes #1256 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de445bb6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de445bb6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de445bb6 Branch: refs/heads/branch-1.2 Commit: de445bb66c48b9d2db0cf87d03c0af171489644b Parents: d60d973 Author: Ravindra Pesala <ravi.pes...@gmail.com> Authored: Mon Aug 14 18:58:25 2017 +0530 Committer: Ravindra Pesala <ravi.pes...@gmail.com> Committed: Sat Sep 16 22:21:36 2017 +0530 ---------------------------------------------------------------------- .../InsertIntoCarbonTableTestCase.scala | 20 +++++++- .../spark/rdd/CarbonDataRDDFactory.scala | 47 +++++++++++-------- .../spark/rdd/CarbonDataRDDFactory.scala | 48 ++++++++++++-------- 3 files changed, 78 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index d1bf28b..8a084ae 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -54,7 +54,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer( sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbonSource order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNum ber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"), sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from TCarbon order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription") - ) + ) val result = sql("show segments for table TCarbon").collect()(0).get(1).toString() if(!"Success".equalsIgnoreCase(result)) { assert(false) @@ -247,6 +247,24 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { assert(folder.list().length == 1) } + test("Load overwrite fail handle") { + sql("drop table if exists TCarbonSourceOverwrite") + sql("create table TCarbonSourceOverwrite (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADParti tionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'") + + sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Late st_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')") + val rowCount = sql("select imei from TCarbonSourceOverwrite").count() + try { + sql("LOAD DATA INPATH '" + resourcesPath + + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacF lashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber', 'bad_records_action'='fail')") + assert(false, "Bad records exists and logger action is fail, so it should fail") + } catch { + case e: Exception => + assert(true) + } + sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')") + assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count()) + } + override def afterAll { sql("drop table if exists load") http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5725717..ef2a917 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -418,6 +418,32 @@ object CarbonDataRDDFactory { } } + def updateStatus(loadStatus: String, + stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = { + val metadataDetails = if (stat != null && stat(0) != null) { + stat(0)._2._1 + } else { + new LoadMetadataDetails + } + CarbonLoaderUtil + .populateNewLoadMetaEntry(metadataDetails, + loadStatus, + carbonLoadModel.getFactTimeStamp, + true) + val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, + carbonLoadModel, false, overwriteTable) + if (!status) { + val errorMessage = "Dataload failed due to failure in table status updation." + LOGGER.audit("Data load is failed for " + + s"${ carbonLoadModel.getDatabaseName }.${ + carbonLoadModel + .getTableName + }") + LOGGER.error("Dataload failed due to failure in table status updation.") + throw new Exception(errorMessage) + } + } + try { LOGGER.audit(s"Data load request has been received for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") @@ -888,6 +914,7 @@ object CarbonDataRDDFactory { LOGGER.audit(s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.warn("Cannot write load metadata file as data load failed") + updateStatus(loadStatus, status) throw new Exception(errorMessage) } else { // check if data load fails due to bad record and throw data load failure due to @@ -900,28 +927,12 @@ object CarbonDataRDDFactory { LOGGER.info("********clean up done**********") LOGGER.audit(s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status) throw new Exception(status(0)._2._2.errorMsg) } - val metadataDetails = status(0)._2._1 if (!isAgg) { writeDictionary(carbonLoadModel, result) - CarbonLoaderUtil - .populateNewLoadMetaEntry(metadataDetails, - loadStatus, - carbonLoadModel.getFactTimeStamp, - true) - val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, - carbonLoadModel, false, overwriteTable) - if (!status) { - val errorMessage = "Dataload failed due to failure in table status updation." - LOGGER.audit("Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ - carbonLoadModel - .getTableName - }") - LOGGER.error("Dataload failed due to failure in table status updation.") - throw new Exception(errorMessage) - } + updateStatus(loadStatus, status) } else if (!carbonLoadModel.isRetentionRequest) { // TODO : Handle it LOGGER.info("********Database updated**********") http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 94d7b15..c7b72d5 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -517,6 +517,32 @@ object CarbonDataRDDFactory { } } + def updateStatus(status: Array[(String, (LoadMetadataDetails, ExecutionErrors))], + loadStatus: String) = { + val metadataDetails = if (status != null && status(0) != null) { + status(0)._2._1 + } else { + new LoadMetadataDetails + } + CarbonLoaderUtil + .populateNewLoadMetaEntry(metadataDetails, + loadStatus, + carbonLoadModel.getFactTimeStamp, + true) + val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, + carbonLoadModel, false, overwriteTable) + if (!success) { + val errorMessage = "Dataload failed due to failure in table status updation." + LOGGER.audit("Data load is failed for " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + LOGGER.error("Dataload failed due to failure in table status updation.") + throw new Exception(errorMessage) + } else if (!carbonLoadModel.isRetentionRequest) { + // TODO : Handle it + LOGGER.info("********Database updated**********") + } + } + try { LOGGER.audit(s"Data load request has been received for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") @@ -986,6 +1012,7 @@ object CarbonDataRDDFactory { LOGGER.audit(s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.warn("Cannot write load metadata file as data load failed") + updateStatus(status, loadStatus) throw new Exception(errorMessage) } else { // check if data load fails due to bad record and throw data load failure due to @@ -998,6 +1025,7 @@ object CarbonDataRDDFactory { LOGGER.info("********clean up done**********") LOGGER.audit(s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) throw new Exception(status(0)._2._2.errorMsg) } // if segment is empty then fail the data load @@ -1009,27 +1037,11 @@ object CarbonDataRDDFactory { s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + " as there is no data to load") LOGGER.warn("Cannot write load metadata file as data load failed") + updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) throw new Exception("No Data to load") } - val metadataDetails = status(0)._2._1 writeDictionary(carbonLoadModel, result, false) - CarbonLoaderUtil - .populateNewLoadMetaEntry(metadataDetails, - loadStatus, - carbonLoadModel.getFactTimeStamp, - true) - val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, - carbonLoadModel, false, overwriteTable) - if (!success) { - val errorMessage = "Dataload failed due to failure in table status updation." - LOGGER.audit("Data load is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${carbonLoadModel.getTableName}") - LOGGER.error("Dataload failed due to failure in table status updation.") - throw new Exception(errorMessage) - } else if (!carbonLoadModel.isRetentionRequest) { - // TODO : Handle it - LOGGER.info("********Database updated**********") - } + updateStatus(status, loadStatus) if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) { LOGGER.audit("Data load is partially successful for " +