Repository: carbondata
Updated Branches:
  refs/heads/master d60d973df -> de445bb66


[CARBONDATA-1380] Fixed updation of load fail to table status file to make 
overwrite queries work properly

Tablestatus file is not updated when load fails. It should be updated with fail 
status otherwise overwrite queries does not work properly.

This closes #1256


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de445bb6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de445bb6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de445bb6

Branch: refs/heads/master
Commit: de445bb66c48b9d2db0cf87d03c0af171489644b
Parents: d60d973
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Mon Aug 14 18:58:25 2017 +0530
Committer: Ravindra Pesala <ravi.pes...@gmail.com>
Committed: Sat Sep 16 22:21:36 2017 +0530

----------------------------------------------------------------------
 .../InsertIntoCarbonTableTestCase.scala         | 20 +++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 47 +++++++++++--------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 48 ++++++++++++--------
 3 files changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index d1bf28b..8a084ae 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -54,7 +54,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
      checkAnswer(
          sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper
 
atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbonSource order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNum
 
ber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
          sql("select 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_oper
 
atorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
 from TCarbon order by 
imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 
test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
-     ) 
+     )
      val result = sql("show segments for table 
TCarbon").collect()(0).get(1).toString()
      if(!"Success".equalsIgnoreCase(result)) {
        assert(false)
@@ -247,6 +247,24 @@ class InsertIntoCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(folder.list().length == 1)
   }
 
+  test("Load overwrite fail handle") {
+    sql("drop table if exists TCarbonSourceOverwrite")
+    sql("create table TCarbonSourceOverwrite (imei string,deviceInformationId 
int,MAC string,deviceColor string,device_backColor string,modelId 
string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADParti
 tionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY 
Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country 
string, Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 
'org.apache.carbondata.format'")
+
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' INTO table 
TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Late
 
st_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    val rowCount = sql("select imei from TCarbonSourceOverwrite").count()
+    try {
+      sql("LOAD DATA INPATH '" + resourcesPath +
+          "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options 
('DELIMITER'=',', 'QUOTECHAR'='\', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacF
 
lashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber',
 'bad_records_action'='fail')")
+      assert(false, "Bad records exists and logger action is fail, so it 
should fail")
+    } catch {
+      case e: Exception =>
+        assert(true)
+    }
+    sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO 
table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
+    assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())
+  }
+
 
   override def afterAll {
     sql("drop table if exists load")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5725717..ef2a917 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -418,6 +418,32 @@ object CarbonDataRDDFactory {
       }
     }
 
+    def updateStatus(loadStatus: String,
+        stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = {
+      val metadataDetails = if (stat != null && stat(0) != null) {
+        stat(0)._2._1
+      } else {
+        new LoadMetadataDetails
+      }
+      CarbonLoaderUtil
+        .populateNewLoadMetaEntry(metadataDetails,
+          loadStatus,
+          carbonLoadModel.getFactTimeStamp,
+          true)
+      val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+        carbonLoadModel, false, overwriteTable)
+      if (!status) {
+        val errorMessage = "Dataload failed due to failure in table status 
updation."
+        LOGGER.audit("Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${
+                       carbonLoadModel
+                         .getTableName
+                     }")
+        LOGGER.error("Dataload failed due to failure in table status 
updation.")
+        throw new Exception(errorMessage)
+      }
+    }
+
     try {
       LOGGER.audit(s"Data load request has been received for table" +
           s" ${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
@@ -888,6 +914,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        updateStatus(loadStatus, status)
         throw new Exception(errorMessage)
       } else {
         // check if data load fails due to bad record and throw data load 
failure due to
@@ -900,28 +927,12 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+          updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status)
           throw new Exception(status(0)._2._2.errorMsg)
         }
-        val metadataDetails = status(0)._2._1
         if (!isAgg) {
             writeDictionary(carbonLoadModel, result)
-            CarbonLoaderUtil
-              .populateNewLoadMetaEntry(metadataDetails,
-                loadStatus,
-                carbonLoadModel.getFactTimeStamp,
-                true)
-            val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-              carbonLoadModel, false, overwriteTable)
-            if (!status) {
-              val errorMessage = "Dataload failed due to failure in table 
status updation."
-              LOGGER.audit("Data load is failed for " +
-                           s"${ carbonLoadModel.getDatabaseName }.${
-                             carbonLoadModel
-                               .getTableName
-                           }")
-              LOGGER.error("Dataload failed due to failure in table status 
updation.")
-              throw new Exception(errorMessage)
-            }
+            updateStatus(loadStatus, status)
         } else if (!carbonLoadModel.isRetentionRequest) {
           // TODO : Handle it
           LOGGER.info("********Database updated**********")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de445bb6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 94d7b15..c7b72d5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -517,6 +517,32 @@ object CarbonDataRDDFactory {
       }
     }
 
+    def updateStatus(status: Array[(String, (LoadMetadataDetails, 
ExecutionErrors))],
+        loadStatus: String) = {
+      val metadataDetails = if (status != null && status(0) != null) {
+        status(0)._2._1
+      } else {
+        new LoadMetadataDetails
+      }
+      CarbonLoaderUtil
+        .populateNewLoadMetaEntry(metadataDetails,
+          loadStatus,
+          carbonLoadModel.getFactTimeStamp,
+          true)
+      val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+        carbonLoadModel, false, overwriteTable)
+      if (!success) {
+        val errorMessage = "Dataload failed due to failure in table status 
updation."
+        LOGGER.audit("Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+        LOGGER.error("Dataload failed due to failure in table status 
updation.")
+        throw new Exception(errorMessage)
+      } else if (!carbonLoadModel.isRetentionRequest) {
+        // TODO : Handle it
+        LOGGER.info("********Database updated**********")
+      }
+    }
+
     try {
       LOGGER.audit(s"Data load request has been received for table" +
           s" ${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
@@ -986,6 +1012,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        updateStatus(status, loadStatus)
         throw new Exception(errorMessage)
       } else {
         // check if data load fails due to bad record and throw data load 
failure due to
@@ -998,6 +1025,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception(status(0)._2._2.errorMsg)
         }
         // if segment is empty then fail the data load
@@ -1009,27 +1037,11 @@ object CarbonDataRDDFactory {
                        s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }" +
                        " as there is no data to load")
           LOGGER.warn("Cannot write load metadata file as data load failed")
+          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception("No Data to load")
         }
-        val metadataDetails = status(0)._2._1
         writeDictionary(carbonLoadModel, result, false)
-        CarbonLoaderUtil
-          .populateNewLoadMetaEntry(metadataDetails,
-            loadStatus,
-            carbonLoadModel.getFactTimeStamp,
-            true)
-        val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-          carbonLoadModel, false, overwriteTable)
-        if (!success) {
-          val errorMessage = "Dataload failed due to failure in table status 
updation."
-          LOGGER.audit("Data load is failed for " +
-              s"${ carbonLoadModel.getDatabaseName 
}.${carbonLoadModel.getTableName}")
-          LOGGER.error("Dataload failed due to failure in table status 
updation.")
-          throw new Exception(errorMessage)
-        } else if (!carbonLoadModel.isRetentionRequest) {
-          // TODO : Handle it
-          LOGGER.info("********Database updated**********")
-        }
+        updateStatus(status, loadStatus)
 
         if 
(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
           LOGGER.audit("Data load is partially successful for " +

Reply via email to