This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e823f3  [CARBONDATA-4016] NPE and FileNotFound in Show Segments and 
Insert Stage
7e823f3 is described below

commit 7e823f38ffcbed4b68fdc7d430bb9ce6b3172d61
Author: haomarch <[email protected]>
AuthorDate: Tue Sep 29 21:31:40 2020 +0800

    [CARBONDATA-4016] NPE and FileNotFound in Show Segments and Insert Stage
    
    Why is this PR needed?
    1. Insert Stage, While Spark read Stages which are writting by Flink in
    the meanwhile, JSONFORMAT EXCEPTION maybe be thrown as the stage file is in 
writing.
    2. Show Segments with STAGE, when read stages which are writting by Flink
    or deleting by spark. JSONFORMAT EXCEPTION will be thrown as the stage file 
is in writing.
    3. Show Segment will load partition info for non-partition table, leading
    to bad performance, which shall be avoided.
    4. In getLastModifiedTime of TableStatus, if the timestamp is empty,
    getLastModifiedTime throw Exception. Accoding to getLoadEndTime(), we shall 
return -1.
    
    What changes were proposed in this PR?
    1. Insert Stage, add RETRY to read stage file.
    2. Show Segments with STAGE, add RETRY to read stage.
    3. Show Segment load partition only for partition table
    4. getLastModifiedTime return -1 if the timestamp is empty.
    
    This closes #3965
---
 .../core/statusmanager/LoadMetadataDetails.java    |  5 +-
 .../org/apache/carbondata/api/CarbonStore.scala    | 58 ++++++++++++++++------
 .../management/CarbonDeleteStageFilesCommand.scala | 15 ++++--
 .../management/CarbonInsertFromStageCommand.scala  | 39 ++++++++++++---
 .../CarbonShowSegmentsAsSelectCommand.scala        |  9 +++-
 .../management/CarbonShowSegmentsCommand.scala     | 18 ++++---
 6 files changed, 110 insertions(+), 34 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 0527492..b8ddf0b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -492,6 +492,9 @@ public class LoadMetadataDetails implements Serializable {
     if (!StringUtils.isEmpty(updateDeltaEndTimestamp)) {
       return convertTimeStampToLong(updateDeltaEndTimestamp);
     }
-    return convertTimeStampToLong(timestamp);
+    if (!StringUtils.isEmpty(timestamp)) {
+      return convertTimeStampToLong(timestamp);
+    }
+    return CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT;
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index c45143a..480e357 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,16 +17,16 @@
 
 package org.apache.carbondata.api
 
-import java.io.InputStreamReader
+import java.io.{DataInputStream, File, FileNotFoundException, 
InputStreamReader}
 import java.time.{Duration, Instant}
 import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
 import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -47,6 +47,10 @@ import org.apache.carbondata.streaming.segment.StreamSegment
 object CarbonStore {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
+  val READ_FILE_RETRY_TIMES = 3
+
+  val READ_FILE_RETRY_INTERVAL = 1000
+
   def readSegments(
       tablePath: String,
       showHistory: Boolean,
@@ -77,12 +81,12 @@ object CarbonStore {
   /**
    * Read stage files and return input files
    */
-  def readStages(tablePath: String): Seq[StageInput] = {
-    val stageFiles = listStageFiles(CarbonTablePath.getStageDir(tablePath))
+  def readStages(tableStagePath: String): Seq[StageInput] = {
+    val stageFiles = listStageFiles(tableStagePath)
     var output = Collections.synchronizedList(new util.ArrayList[StageInput]())
-    output.addAll(readStageInput(stageFiles._1,
+    output.addAll(readStageInput(tableStagePath, stageFiles._1,
       StageInput.StageStatus.Unload).asJavaCollection)
-    output.addAll(readStageInput(stageFiles._2,
+    output.addAll(readStageInput(tableStagePath, stageFiles._2,
       StageInput.StageStatus.Loading).asJavaCollection)
     Collections.sort(output, new Comparator[StageInput]() {
       def compare(stageInput1: StageInput, stageInput2: StageInput): Int = {
@@ -96,20 +100,46 @@ object CarbonStore {
    * Read stage files and return input files
    */
   def readStageInput(
+      tableStagePath: String,
       stageFiles: Seq[CarbonFile],
       status: StageInput.StageStatus): Seq[StageInput] = {
     val gson = new Gson()
     val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
-    stageFiles.map { stage =>
-      val filePath = stage.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
+    stageFiles.foreach { stage =>
+      val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + 
stage.getName
+      var stream: DataInputStream = null
       try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
-        stageInput.setCreateTime(stage.getLastModifiedTime)
-        stageInput.setStatus(status)
-        output.add(stageInput)
+        stream = FileFactory.getDataInputStream(filePath)
+        var retry = READ_FILE_RETRY_TIMES
+        breakable {
+          while (retry > 0) {
+            try {
+              val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
+              stageInput.setCreateTime(stage.getLastModifiedTime)
+              stageInput.setStatus(status)
+              output.add(stageInput)
+              break()
+            } catch {
+              case _ : FileNotFoundException =>
+                LOGGER.warn(s"The stage file $filePath does not exist")
+                break()
+              case ex: Exception => retry -= 1
+                if (retry > 0) {
+                  LOGGER.warn(s"The stage file $filePath can't be read, retry 
" +
+                    s"$retry times: ${ex.getMessage}")
+                  Thread.sleep(READ_FILE_RETRY_INTERVAL)
+                } else {
+                  LOGGER.error(s"The stage file $filePath can't be" +
+                    s" read: ${ex.getMessage}")
+                  throw ex
+                }
+            }
+          }
+        }
       } finally {
-        stream.close()
+        if (stream != null) {
+          stream.close()
+        }
       }
     }
     output.asScala
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
index 2432340..0fc098c 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import java.io.InputStreamReader
+import java.io.{DataInputStream, InputStreamReader}
 import java.util
 import java.util.Collections
 
@@ -63,6 +63,7 @@ case class CarbonDeleteStageFilesCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on 
materialized view")
     }
     val tablePath = table.getTablePath
+    val tableStagePath = table.getStagePath
     val startTime = System.currentTimeMillis()
     val stageDataFileActiveTime = try {
       Integer.valueOf(options.getOrElse("retain_hour", "0")) * 3600000
@@ -76,7 +77,8 @@ case class CarbonDeleteStageFilesCommand(
         "Option [retain_hour] is negative.")
     }
     val stageDataFilesReferenced =
-      listStageDataFilesReferenced(listStageMetadataFiles(tablePath, 
configuration), configuration)
+      listStageDataFilesReferenced(listStageMetadataFiles(tablePath, 
configuration),
+        tableStagePath, configuration)
     val stageDataFiles = listStageDataFiles(tablePath, configuration)
     stageDataFiles.collect {
       case stageDataFile: CarbonFile =>
@@ -129,6 +131,7 @@ case class CarbonDeleteStageFilesCommand(
    */
   private def listStageDataFilesReferenced(
       stageFiles: Seq[CarbonFile],
+      tableStagePath: String,
       configuration: Configuration
   ): Set[String] = {
     if (stageFiles.isEmpty) {
@@ -138,8 +141,10 @@ case class CarbonDeleteStageFilesCommand(
     val stageDataFilesReferenced = Collections.synchronizedSet(new 
util.HashSet[String]())
     val startTime = System.currentTimeMillis()
     stageFiles.foreach { stageFile =>
-      val stream = FileFactory.getDataInputStream(stageFile.getAbsolutePath, 
configuration)
+      val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR + 
stageFile.getName
+      var stream: DataInputStream = null
       try {
+        stream = FileFactory.getDataInputStream(filePath, configuration)
         val stageInput =
           new Gson().fromJson(new InputStreamReader(stream), 
classOf[StageInput])
         val stageDataBase = stageInput.getBase + 
CarbonCommonConstants.FILE_SEPARATOR
@@ -171,7 +176,9 @@ case class CarbonDeleteStageFilesCommand(
           )
         }
       } finally {
-        stream.close()
+        if (stream != null) {
+          stream.close()
+        }
       }
     }
     LOGGER.info(s"Read stage files taken ${ System.currentTimeMillis() - 
startTime }ms.")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index d51c3d5..bf3d6d0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import java.io.{File, InputStreamReader, IOException}
+import java.io.{DataInputStream, File, InputStreamReader, IOException}
 import java.util
 import java.util.Collections
 import java.util.concurrent.{Callable, Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
 import org.apache.hadoop.conf.Configuration
@@ -171,7 +172,7 @@ case class CarbonInsertFromStageCommand(
     try{
       // 2) read all stage files to collect input files for data loading
       // create a thread pool to read them
-      val stageInputs = collectStageInputs(executorService, stageFiles)
+      val stageInputs = collectStageInputs(executorService, stagePath, 
stageFiles)
 
       // 3) perform data loading
       if (table.isHivePartitionTable) {
@@ -469,6 +470,7 @@ case class CarbonInsertFromStageCommand(
    */
   private def collectStageInputs(
       executorService: ExecutorService,
+      tableStagePath: String,
       stageFiles: Array[(CarbonFile, CarbonFile)]
   ): Seq[StageInput] = {
     val startTime = System.currentTimeMillis()
@@ -477,13 +479,34 @@ case class CarbonInsertFromStageCommand(
     stageFiles.map { stage =>
       executorService.submit(new Runnable {
         override def run(): Unit = {
-          val filePath = stage._1.getAbsolutePath
-          val stream = FileFactory.getDataInputStream(filePath)
+          val filePath = tableStagePath + CarbonCommonConstants.FILE_SEPARATOR 
+ stage._1.getName
+          var stream: DataInputStream = null
           try {
-            val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
-            output.add(stageInput)
+            stream = FileFactory.getDataInputStream(filePath)
+            var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
+            breakable (
+              while (retry > 0) {
+                try {
+                  val stageInput = gson.fromJson(new 
InputStreamReader(stream), classOf[StageInput])
+                  output.add(stageInput)
+                  break()
+                } catch {
+                  case ex: Exception => retry -= 1
+                    if (retry > 0) {
+                      LOGGER.warn(s"The stage file $filePath can't be read, 
retry " +
+                        s"$retry times: ${ex.getMessage}")
+                      
Thread.sleep(CarbonInsertFromStageCommand.DELETE_FILES_RETRY_INTERVAL)
+                    } else {
+                      LOGGER.error(s"The stage file $filePath can't be read: 
${ex.getMessage}")
+                      throw ex
+                    }
+                }
+              }
+            )
           } finally {
-            stream.close()
+            if (stream != null) {
+              stream.close()
+            }
           }
         }
       })
@@ -745,6 +768,8 @@ object CarbonInsertFromStageCommand {
 
   val DELETE_FILES_RETRY_TIMES = 3
 
+  val DELETE_FILES_RETRY_INTERVAL = 1000
+
   val BATCH_FILE_COUNT_KEY = "batch_file_count"
 
   val BATCH_FILE_COUNT_DEFAULT: String = Integer.MAX_VALUE.toString
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
index 66a8b9a..fa422a2 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -73,10 +73,11 @@ case class CarbonShowSegmentsAsSelectCommand(
   override protected def opName: String = "SHOW SEGMENTS"
 
   private def createDataFrame: DataFrame = {
+    val tableStagePath = carbonTable.getStagePath
     val tablePath = carbonTable.getTablePath
     var rows: Seq[SegmentRow] = Seq()
     if (withStage) {
-      val stageRows = CarbonShowSegmentsCommand.showStages(tablePath)
+      val stageRows = CarbonShowSegmentsCommand.showStages(tableStagePath)
       if (stageRows.nonEmpty) {
         rows = stageRows.map(
           stageRow =>
@@ -134,7 +135,11 @@ case class CarbonShowSegmentsAsSelectCommand(
       val endTime = CarbonStore.getLoadEndTime(segment)
       val timeTaken = CarbonStore.getLoadTimeTakenAsMillis(segment)
       val (dataSize, indexSize) = CarbonStore.getDataAndIndexSize(tablePath, 
segment)
-      val partitions = CarbonStore.getPartitions(tablePath, segment)
+      val partitions = if (carbonTable.isHivePartitionTable) {
+        CarbonStore.getPartitions(tablePath, segment)
+      } else {
+        Seq.empty
+      }
       SegmentRow(
         segment.getLoadName,
         segment.getSegmentStatus.toString,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
index f38bb37..5d6baad 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -60,27 +60,33 @@ case class CarbonShowSegmentsCommand(
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
+    val tableStagePath = carbonTable.getStagePath
     val tablePath = carbonTable.getTablePath
     var rows: Seq[Row] = Seq()
     if (withStage) {
-      rows = CarbonShowSegmentsCommand.showStages(tablePath)
+      rows = CarbonShowSegmentsCommand.showStages(tableStagePath)
     }
 
     val segments = readSegments(tablePath, showHistory, limit)
-    rows ++ showBasic(segments, tablePath)
+    rows ++ showBasic(segments, tablePath, carbonTable.isHivePartitionTable)
   }
 
   override protected def opName: String = "SHOW SEGMENTS"
 
   private def showBasic(
       segments: Array[LoadMetadataDetails],
-      tablePath: String): Seq[Row] = {
+      tablePath: String,
+      isPartitionTable: Boolean): Seq[Row] = {
     segments
       .map { segment =>
         val startTime = getLoadStartTime(segment)
         val timeTaken = getLoadTimeTaken(segment)
         val (dataSize, indexSize) = getDataAndIndexSize(tablePath, segment)
-        val partitions = getPartitions(tablePath, segment)
+        val partitions = if (isPartitionTable) {
+          getPartitions(tablePath, segment)
+        } else {
+          Seq.empty
+        }
         val partitionString = if (partitions.size == 1) {
           partitions.head
         } else if (partitions.size > 1) {
@@ -103,8 +109,8 @@ case class CarbonShowSegmentsCommand(
 
 object CarbonShowSegmentsCommand {
 
-  def showStages(tablePath: String): Seq[Row] = {
-    toRows(readStages(tablePath))
+  def showStages(tableStagePath: String): Seq[Row] = {
+    toRows(readStages(tableStagePath))
   }
 
   private def toRows(stages: Seq[StageInput]): Seq[Row] = {

Reply via email to