marchpure commented on a change in pull request #3799:
URL: https://github.com/apache/carbondata/pull/3799#discussion_r443452016



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##########
@@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand(
     output.asScala
   }
 
+  /**
+   * create '.loading' file to tag the stage in process
+   * Return false means the stage files were creat successfully
+   * While return true means the stage files were failed to create
+   */
+  private def createStageLoadingFiles(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
+    stageFiles.map { files =>
+      executorService.submit(new Callable[Boolean] {
+        override def call(): Boolean = {
+          val stageLoadingFile =
+            FileFactory.getCarbonFile(files._1.getAbsolutePath +
+              CarbonTablePath.LOADING_FILE_SUBFIX);
+          if (!stageLoadingFile.exists()) {
+            stageLoadingFile.createNewFile();
+          } else {
+            stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+          }
+        }
+      })
+    }.filter { future =>
+      future.get()
+    }
+    stageFiles
+  }
+
+  /**
+   * create '.loading' file with retry
+   */
+  private def createStageLoadingFilesWithRetry(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
+    while (createStageLoadingFiles(executorService, stageFiles).length > 0 && 
retry > 0) {

Review comment:
       checked. it shall loop continue.  
createStageLoadingFiles(executorService, stageFiles).length is equal to the 
stages fails to tag 'loading'. if length >0, we shall loop continue and retry 
to tag 'loading' again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to