This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 07a3637  [CARBONDATA-3944] Insert Stage interrupted when IOException 
happen
07a3637 is described below

commit 07a3637201f0dbbd2c85906bfc53039546d26c1f
Author: haomarch <[email protected]>
AuthorDate: Fri Aug 7 18:14:35 2020 +0800

    [CARBONDATA-3944] Insert Stage interrupted when IOException happen
    
    Why is this PR needed?
    In the insertstage flow, the stage files will be deleted with retry 
mechanism. but when IOException happen due to network abnormal etc, the insert 
stage will be interrupted, which is unexpected.
    
    What changes were proposed in this PR?
    When catch IOexception in the insert stage flow, Continue to retry.
    
    Does this PR introduce any user interface change?
    No
    Is any new testcase added?
    No
    
    This closes #3886
---
 .../management/CarbonInsertFromStageCommand.scala  | 71 +++++++++++++---------
 1 file changed, 41 insertions(+), 30 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 52c7807..d51c3d5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import java.io.{InputStreamReader, IOException}
+import java.io.{File, InputStreamReader, IOException}
 import java.util
 import java.util.Collections
 import java.util.concurrent.{Callable, Executors, ExecutorService}
@@ -159,7 +159,7 @@ case class CarbonInsertFromStageCommand(
       // '.loading' suffix filename
       val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
       executorService = Executors.newFixedThreadPool(numThreads)
-      createStageLoadingFilesWithRetry(executorService, stageFiles)
+      createStageLoadingFilesWithRetry(table.getStagePath, executorService, 
stageFiles)
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert 
${table.getDatabaseName}.${table.getTableName}", ex)
@@ -181,7 +181,7 @@ case class CarbonInsertFromStageCommand(
       }
 
       // 4) delete stage files
-      deleteStageFilesWithRetry(executorService, stageFiles)
+      deleteStageFilesWithRetry(table.getStagePath, executorService, 
stageFiles)
 
       // 5) delete the snapshot file
       deleteSnapShotFileWithRetry(table, snapshotFilePath)
@@ -499,25 +499,30 @@ case class CarbonInsertFromStageCommand(
    * return the loading files failed to create
    */
   private def createStageLoadingFiles(
+      stagePath: String,
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
     stageFiles.map { files =>
       executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
         override def call(): (CarbonFile, CarbonFile, Boolean) = {
-          // Get the loading files path
-          val stageLoadingFile =
-            FileFactory.getCarbonFile(files._1.getAbsolutePath +
-              CarbonTablePath.LOADING_FILE_SUFFIX);
-          // Try to create loading files
-          // make isFailed to be true if createNewFile return false.
-          // the reason can be file exists or exceptions.
-          var isFailed = !stageLoadingFile.createNewFile()
-          // if file exists, modify the lastModifiedTime of the file.
-          if (isFailed) {
-            // make isFailed to be true if setLastModifiedTime return false.
-            isFailed = 
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+          try {
+            // Get the loading files path
+            val stageLoadingFile =
+              FileFactory.getCarbonFile(stagePath +
+                File.separator + files._1.getName + 
CarbonTablePath.LOADING_FILE_SUFFIX);
+            // Try to create loading files
+            // make isFailed to be true if createNewFile return false.
+            // the reason can be file exists or exceptions.
+            var isFailed = !stageLoadingFile.createNewFile()
+            // if file exists, modify the lastmodifiedtime of the file.
+            if (isFailed) {
+              // make isFailed to be true if setLastModifiedTime return false.
+              isFailed = 
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+            }
+            (files._1, files._2, isFailed)
+          } catch {
+            case _ : Exception => (files._1, files._2, true)
           }
-          (files._1, files._2, isFailed)
         }
       })
     }.map { future =>
@@ -534,6 +539,7 @@ case class CarbonInsertFromStageCommand(
    * create '.loading' file with retry
    */
   private def createStageLoadingFilesWithRetry(
+      stagePath: String,
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
     val startTime = System.currentTimeMillis()
@@ -541,7 +547,7 @@ case class CarbonInsertFromStageCommand(
     var needToCreateStageLoadingFiles = stageFiles
     while (retry > 0 && needToCreateStageLoadingFiles.nonEmpty) {
       needToCreateStageLoadingFiles =
-        createStageLoadingFiles(executorService, needToCreateStageLoadingFiles)
+        createStageLoadingFiles(stagePath, executorService, 
needToCreateStageLoadingFiles)
       retry -= 1
     }
     LOGGER.info(s"finished to create stage loading files, time taken: " +
@@ -557,25 +563,29 @@ case class CarbonInsertFromStageCommand(
    * Return the files failed to delete
    */
   private def deleteStageFiles(
+      stagePath: String,
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
     stageFiles.map { files =>
       executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
         override def call(): (CarbonFile, CarbonFile, Boolean) = {
           // Delete three types of file: stage|.success|.loading
-          val stageLoadingFile =
-            FileFactory.getCarbonFile(files._1.getAbsolutePath
-              + CarbonTablePath.LOADING_FILE_SUFFIX);
-          var isFailed = false
-          // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
-          // Considering FileNotFound means file clean successfully.
-          // We need double check the file exists or not when delete() return 
false.
-          if (!(files._1.delete() && files._2.delete() && 
stageLoadingFile.delete())) {
-            // If the file still exists,  make isFailed to be true
-            // So we can retry to delete this file.
-            isFailed = files._1.exists() || files._1.exists() || 
stageLoadingFile.exists()
+          try {
+            val stageLoadingFile = FileFactory.getCarbonFile(stagePath +
+              File.separator + files._1.getName + 
CarbonTablePath.LOADING_FILE_SUFFIX);
+            var isFailed = false
+            // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
+            // Considering FileNotFound means FileCleanSucessfully.
+            // We need double check the file exists or not when delete() 
return false.
+            if (!files._1.delete() || !files._2.delete() || 
!stageLoadingFile.delete()) {
+              // If the file still exists,  make isFailed to be true
+              // So we can retry to delete this file.
+              isFailed = files._1.exists() || files._1.exists() || 
stageLoadingFile.exists()
+            }
+            (files._1, files._2, isFailed)
+          } catch {
+            case _: Exception => (files._1, files._2, true)
           }
-          (files._1, files._2, isFailed)
         }
       })
     }.map { future =>
@@ -592,6 +602,7 @@ case class CarbonInsertFromStageCommand(
    * Delete stage file and success file with retry
    */
   private def deleteStageFilesWithRetry(
+      stagePath: String,
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
     val startTime = System.currentTimeMillis()
@@ -599,7 +610,7 @@ case class CarbonInsertFromStageCommand(
     var needToDeleteStageFiles = stageFiles
     while (retry > 0 && needToDeleteStageFiles.nonEmpty) {
       needToDeleteStageFiles =
-        deleteStageFiles(executorService, needToDeleteStageFiles)
+        deleteStageFiles(stagePath, executorService, needToDeleteStageFiles)
       retry -= 1
     }
     LOGGER.info(s"finished to delete stage files, time taken: " +

Reply via email to