This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new dd992ec  [CARBONDATA-3875] Support show segments with stage
dd992ec is described below

commit dd992ec2375f8130fd1c7530e3b603b05963842e
Author: marchpure <marchp...@126.com>
AuthorDate: Sun Jun 21 23:45:32 2020 +0800

    [CARBONDATA-3875] Support show segments with stage
    
    Why is this PR needed?
    Currently, there is a lack of monitoring of the stages information, 'Show 
segments with stage' command shall be supported, which can provide monitoring 
information, such as createTime, partitioninfo, etc.
    
    What changes were proposed in this PR?
    added 'With stage semantics' in the show segments flow, which will collect 
stageinfo by read stage files.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #3798
---
 .../carbondata/core/statusmanager/StageInput.java  |  30 ++++
 docs/segment-management-on-carbondata.md           |  11 +-
 .../carbon/flink/TestCarbonPartitionWriter.scala   | 195 ++++++++++++++-------
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 116 +++++++++++-
 .../org/apache/carbondata/api/CarbonStore.scala    |  91 +++++++++-
 .../management/CarbonInsertFromStageCommand.scala  |   2 +
 .../CarbonShowSegmentsAsSelectCommand.scala        |  44 ++++-
 .../management/CarbonShowSegmentsCommand.scala     | 113 ++++++++++--
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  14 +-
 9 files changed, 519 insertions(+), 97 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
index 10dd51d..893b962 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -45,6 +45,16 @@ public class StageInput {
    */
   private List<PartitionLocation> locations;
 
+  /**
+   * current stage create at this time.
+   */
+  private transient long createTime;
+
+  /**
+   * status of stage, unloaded or loading.
+   */
+  private StageStatus status;
+
   public StageInput() {
 
   }
@@ -83,6 +93,14 @@ public class StageInput {
     this.locations = locations;
   }
 
+  public StageStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(StageStatus status) {
+    this.status = status;
+  }
+
   public List<InputSplit> createSplits() {
     return
         files.entrySet().stream().filter(
@@ -94,6 +112,14 @@ public class StageInput {
         ).collect(Collectors.toList());
   }
 
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public void setCreateTime(long createTime) {
+    this.createTime = createTime;
+  }
+
   public static final class PartitionLocation {
 
     public PartitionLocation() {
@@ -133,4 +159,8 @@ public class StageInput {
 
   }
 
+  public enum StageStatus {
+    Unload, Loading
+  }
+
 }
diff --git a/docs/segment-management-on-carbondata.md 
b/docs/segment-management-on-carbondata.md
index 3ef0a3a..6c144b1 100644
--- a/docs/segment-management-on-carbondata.md
+++ b/docs/segment-management-on-carbondata.md
@@ -32,7 +32,7 @@ concept which helps to maintain consistency of data and easy 
transaction managem
 
   ```
   SHOW [HISTORY] SEGMENTS
-  [FOR TABLE | ON] [db_name.]table_name [LIMIT number_of_segments]
+  [FOR TABLE | ON] [db_name.]table_name [INCLUDE STAGE] [LIMIT 
number_of_segments]
   [AS (select query from table_name_segments)]
   ```
 
@@ -65,6 +65,12 @@ concept which helps to maintain consistency of data and easy 
transaction managem
   SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable
   ```
 
+  Show all segments, include stages
+  Stage status is 'Unload' or 'Loading', and stage ID, stage load time taken 
is null.
+  ```
+  SHOW SEGMENTS ON CarbonDatabase.CarbonTable INCLUDE STAGE
+  ```
+
 
   When more detail of the segment is required, user can issue SHOW SEGMENT by 
query.    
     
@@ -99,6 +105,9 @@ concept which helps to maintain consistency of data and easy 
transaction managem
   
   SHOW SEGMENTS ON CarbonTable AS
   SELECT avg(timeTakenMs) FROM CarbonTable_segments  
+  
+  SHOW SEGMENTS ON CarbonTable INCLUDE STAGE AS
+  SELECT avg(timeTakenMs) FROM CarbonTable_segments
   ```
 
 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 61c4121..5e82b96 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -17,17 +17,10 @@
 
 package org.apache.carbon.flink
 
-import java.io.{File, InputStreamReader}
-import java.util
-import java.util.concurrent.{Callable, Executors}
-import java.util.{Base64, Collections, Properties}
+import java.text.SimpleDateFormat
+import java.util.concurrent.Executors
+import java.util.{Base64, Properties}
 
-import com.google.gson.Gson
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.statusmanager.StageInput
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.core.fs.Path
@@ -35,11 +28,15 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
+import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 import org.scalatest.BeforeAndAfterAll
 
 class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll{
@@ -69,6 +66,113 @@ class TestCarbonPartitionWriter extends QueryTest with 
BeforeAndAfterAll{
     }
   }
 
+  test("Show segments with stage") {
+    createPartitionTable
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val tableStagePath = CarbonTablePath.getStageDir(tablePath)
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      val dataCount = 1000
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, 
carbonProperties, environment, source)
+
+      // 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
+      var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
+      var unloadedStageCount = 
CarbonStore.listStageFiles(tableStagePath)._1.length
+      assert(rows.length == unloadedStageCount)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getString(3) == null)
+        assert(!rows(index).getString(4).equals("NA"))
+        assert(rows(index).getString(5) != null)
+        assert(rows(index).getString(6) != null)
+        assert(rows(index).getString(7) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
+      val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH 
STAGE").collect()
+      assert(rowsfortable.length == rows.length)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).toString() == rowsfortable(index).toString())
+      }
+
+      // 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
+      rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+        s"(SELECT * FROM $tableName" + "_segments)").collect()
+      for (index <- 0 until unloadedStageCount) {
+        val row = rows(index)
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getLong(3) == -1)
+        assert(!rows(index).get(4).toString.equals("WrappedArray(NA)"))
+        assert(rows(index).getLong(5) > 0)
+        assert(rows(index).getLong(6) > 0)
+        assert(rows(index).getString(7) == null)
+        assert(rows(index).getString(8) == null)
+        assert(rows(index).getString(9) == null)
+        assert(rows(index).getString(10) == null)
+        assert(rows(index).getString(11) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
+      //    Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
+      if (unloadedStageCount > 1) {
+        sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
+
+        unloadedStageCount = 
CarbonStore.listStageFiles(tableStagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 
1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
+          s"(SELECT * FROM $tableName" + "_segments)").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        unloadedStageCount = 
CarbonStore.listStageFiles(tableStagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 
'Unload')").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 
'Success')").collect()
+        assert(rows.length >= 1)
+        assert(rows(0).getString(1).equals("Success"))
+
+        // createFakeLoadingStage
+        createFakeLoadingStage(CarbonTablePath.getStageDir(tablePath))
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 
'Loading')").collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1).equals("Loading"))
+
+        var (unloadedFiles, loadingFiles) = 
CarbonStore.listStageFiles(tableStagePath)
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments " +
+          "where status = 'Unload' or status = 'Loading')").collect()
+        assert(rows.length == unloadedFiles.length + loadingFiles.length)
+      }
+    }
+  }
+
   test("test concurrent insertstage") {
     createPartitionTable
     try {
@@ -85,11 +189,12 @@ class TestCarbonPartitionWriter extends QueryTest with 
BeforeAndAfterAll{
       val source = getTestSource(dataCount)
       executeStreamingEnvironment(tablePath, writerProperties, 
carbonProperties, environment, source)
 
+      Thread.sleep(5000)
       val executorService = Executors.newFixedThreadPool(10)
       for(i <- 1 to 10) {
         executorService.submit(new Runnable {
           override def run(): Unit = {
-            sql(s"INSERT INTO $tableName STAGE 
OPTIONS('batch_file_count'='1')")
+            sql(s"INSERT INTO $tableName STAGE 
OPTIONS('batch_file_count'='5')")
           }
         }).get()
       }
@@ -306,56 +411,24 @@ class TestCarbonPartitionWriter extends QueryTest with 
BeforeAndAfterAll{
     properties
   }
 
-  private def collectStageInputs(loadDetailsDir: String): Seq[StageInput] = {
-    val dir = FileFactory.getCarbonFile(loadDetailsDir)
-    val stageFiles = if (dir.exists()) {
-      val allFiles = dir.listFiles()
-      val successFiles = allFiles.filter { file =>
-        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
-      }.map { file =>
-        (file.getName.substring(0, file.getName.indexOf(".")), file)
-      }.toMap
-      allFiles.filter { file =>
-        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
-      }.filter { file =>
-        successFiles.contains(file.getName)
-      }.map { file =>
-        (file, successFiles(file.getName))
-      }
-    } else {
-      Array.empty
-    }
-
-    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
-    val gson = new Gson()
-    stageFiles.map { stage =>
-      val filePath = stage._1.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
-      try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
-        output.add(stageInput)
-      } finally {
-        stream.close()
-      }
+  private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): 
Unit = {
+    if (index > 0) {
+      val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index).getString(2)).getTime
+      val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index - 1).getString(2)).getTime
+      assert(nowtime <= lasttime)
     }
-    output.asScala
   }
 
-  private def delDir(dir: File): Boolean = {
-    if (dir.isDirectory) {
-      val children = dir.list
-      if (children != null) {
-        val length = children.length
-        var i = 0
-        while (i < length) {
-          if (!delDir(new File(dir, children(i)))) {
-              return false
-          }
-          i += 1
-        }
-      }
-    }
-    dir.delete()
+  private def createFakeLoadingStage(stagePath: String): Unit = {
+    var (unloadedFiles, loadingFiles) = CarbonStore.listStageFiles(stagePath)
+    assert(unloadedFiles.length > 0)
+    val loadingFilesCountBefore = loadingFiles.length
+    FileFactory.getCarbonFile(unloadedFiles(0).getAbsolutePath +
+      CarbonTablePath.LOADING_FILE_SUBFIX).createNewFile()
+    loadingFiles = CarbonStore.listStageFiles(stagePath)._2
+    val loadingFilesCountAfter = loadingFiles.length
+    assert(loadingFilesCountAfter == loadingFilesCountBefore + 1)
   }
-
 }
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index c40273d..74c5d94 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -17,25 +17,27 @@
 
 package org.apache.carbon.flink
 
+import java.text.SimpleDateFormat
 import java.util.Properties
 
 import org.apache.flink.api.common.JobExecutionResult
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 
+import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.spark.sql.execution.exchange.Exchange
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
 import org.scalatest.BeforeAndAfterAll
 
 class TestCarbonWriter extends QueryTest with BeforeAndAfterAll{
@@ -241,6 +243,99 @@ class TestCarbonWriter extends QueryTest with 
BeforeAndAfterAll{
     }
   }
 
+  test("Show segments with stage") {
+    createTable
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val stagePath = CarbonTablePath.getStageDir(tablePath)
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      executeFlinkStreamingEnvironment(environment, writerProperties, 
carbonProperties)
+
+      // 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
+      var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
+      var unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+      assert(rows.length == unloadedStageCount)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getString(3) == null)
+        assert(rows(index).getString(4).equals("NA"))
+        assert(rows(index).getString(5) != null)
+        assert(rows(index).getString(6) != null)
+        assert(rows(index).getString(7) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
+      val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH 
STAGE").collect()
+      assert(rowsfortable.length == rows.length)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).toString() == rowsfortable(index).toString())
+      }
+
+      // 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
+      rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+        s"(SELECT * FROM $tableName" + "_segments)").collect()
+      for (index <- 0 until unloadedStageCount) {
+        val row = rows(index)
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getLong(3) == -1)
+        assert(rows(index).get(4).toString.equals("WrappedArray(NA)"))
+        assert(rows(index).getLong(5) > 0)
+        assert(rows(index).getLong(6) > 0)
+        assert(rows(index).getString(7) == null)
+        assert(rows(index).getString(8) == null)
+        assert(rows(index).getString(9) == null)
+        assert(rows(index).getString(10) == null)
+        assert(rows(index).getString(11) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
+      //    Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
+
+      if (unloadedStageCount > 1) {
+        sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
+
+        unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 
1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
+          s"(SELECT * FROM $tableName" + "_segments)").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 
'Unload')").collect()
+        unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 
'Success')").collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1).equals("Success"))
+      }
+    }
+  }
+
   private def executeFlinkStreamingEnvironment(environment: 
StreamExecutionEnvironment,
       writerProperties: Properties,
       carbonProperties: Properties): JobExecutionResult = {
@@ -321,4 +416,13 @@ class TestCarbonWriter extends QueryTest with 
BeforeAndAfterAll{
     properties
   }
 
+  private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): 
Unit = {
+    if (index > 0) {
+      val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index).getString(2)).getTime
+      val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index - 1).getString(2)).getTime
+      assert(nowtime <= lasttime)
+    }
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 02d36cf..d970e00 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,11 +17,16 @@
 
 package org.apache.carbondata.api
 
+import java.io.InputStreamReader
 import java.time.{Duration, Instant}
+import java.util
+import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 
+import com.google.gson.Gson
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -35,7 +40,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -45,7 +50,7 @@ object CarbonStore {
   def readSegments(
       tablePath: String,
       showHistory: Boolean,
-      limit: Option[String]): Array[LoadMetadataDetails] = {
+      limit: Option[Int]): Array[LoadMetadataDetails] = {
     val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
     var segmentsMetadataDetails = if (showHistory) {
       SegmentStatusManager.readLoadMetadata(metaFolder) ++
@@ -63,13 +68,91 @@ object CarbonStore {
     }
 
     if (limit.isDefined) {
-      val lim = Integer.parseInt(limit.get)
-      segmentsMetadataDetails.slice(0, lim)
+      segmentsMetadataDetails.slice(0, limit.get)
     } else {
       segmentsMetadataDetails
     }
   }
 
+  /**
+   * Read stage files and return input files
+   */
+  def readStages(tablePath: String): Seq[StageInput] = {
+    val stageFiles = listStageFiles(CarbonTablePath.getStageDir(tablePath))
+    var output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    output.addAll(readStageInput(stageFiles._1,
+      StageInput.StageStatus.Unload).asJavaCollection)
+    output.addAll(readStageInput(stageFiles._2,
+      StageInput.StageStatus.Loading).asJavaCollection)
+    Collections.sort(output, new Comparator[StageInput]() {
+      def compare(stageInput1: StageInput, stageInput2: StageInput): Int = {
+        (stageInput2.getCreateTime - stageInput1.getCreateTime).intValue()
+      }
+    })
+    output.asScala
+  }
+
+  /**
+   * Read stage files and return input files
+   */
+  def readStageInput(
+      stageFiles: Seq[CarbonFile],
+      status: StageInput.StageStatus): Seq[StageInput] = {
+    val gson = new Gson()
+    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    stageFiles.map { stage =>
+      val filePath = stage.getAbsolutePath
+      val stream = FileFactory.getDataInputStream(filePath)
+      try {
+        val stageInput = gson.fromJson(new InputStreamReader(stream), 
classOf[StageInput])
+        stageInput.setCreateTime(stage.getLastModifiedTime)
+        stageInput.setStatus(status)
+        output.add(stageInput)
+      } finally {
+        stream.close()
+      }
+    }
+    output.asScala
+  }
+
+  /*
+   * Collect all stage files and matched success files and loading files.
+   * return unloaded stagefiles and loading stagefiles in the end.
+   */
+  def listStageFiles(
+        loadDetailsDir: String): (Array[CarbonFile], Array[CarbonFile]) = {
+    val dir = FileFactory.getCarbonFile(loadDetailsDir)
+    if (dir.exists()) {
+      // 1. List all files in the stage dictionary.
+      val allFiles = dir.listFiles()
+
+      // 2. Get StageFile list.
+      // Firstly, get the stage files in the stage dictionary.
+      //        which exclude the success files and loading files
+      // Second,  only collect the stage files having success tag.
+      val stageFiles = allFiles.filterNot { file =>
+        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.filterNot { file =>
+        file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUBFIX)
+      }.filter { file =>
+        allFiles.contains(file.getName + CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.sortWith {
+        (file1, file2) => file1.getLastModifiedTime > file2.getLastModifiedTime
+      }
+      // 3. Get the unloaded stage files, which haven't loading tag.
+      val unloadedFiles = stageFiles.filterNot { file =>
+        allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUBFIX)
+      }
+      // 4. Get the loading stage files, which have loading tag.
+      val loadingFiles = stageFiles.filter { file =>
+        allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUBFIX)
+      }
+      (unloadedFiles, loadingFiles)
+    } else {
+      (Array.empty, Array.empty)
+    }
+  }
+
   def getPartitions(tablePath: String, load: LoadMetadataDetails): Seq[String] 
= {
     val segmentFile = SegmentFileStore.readSegmentFile(
       CarbonTablePath.getSegmentFilePath(tablePath, load.getSegmentFile))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index cb27db9..e8420ec 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -680,6 +680,8 @@ case class CarbonInsertFromStageCommand(
       val stageFiles = allFiles.filter { file =>
         !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.filter { file =>
+        !file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUBFIX)
+      }.filter { file =>
         successFiles.contains(file.getName)
       }.filterNot { file =>
         loadingFiles.contains(file.getName)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
index 7d1c710..f1e668e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.api.CarbonStore.readSegments
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
@@ -35,8 +36,9 @@ case class CarbonShowSegmentsAsSelectCommand(
     databaseNameOp: Option[String],
     tableName: String,
     query: String,
-    limit: Option[String],
-    showHistory: Boolean = false)
+    limit: Option[Int],
+    showHistory: Boolean = false,
+    withStage: Boolean = false)
   extends DataCommand {
 
   private lazy val sparkSession = SparkSession.getActiveSession.get
@@ -48,7 +50,7 @@ case class CarbonShowSegmentsAsSelectCommand(
 
   override def output: Seq[Attribute] = {
     df.queryExecution.analyzed.output.map { attr =>
-      AttributeReference(attr.name, attr.dataType, nullable = false)()
+      AttributeReference(attr.name, attr.dataType, nullable = true)()
     }
   }
 
@@ -72,9 +74,34 @@ case class CarbonShowSegmentsAsSelectCommand(
 
   private def createDataFrame: DataFrame = {
     val tablePath = carbonTable.getTablePath
-    val segments = CarbonStore.readSegments(tablePath, showHistory, limit)
+    var rows: Seq[SegmentRow] = Seq()
+    if (withStage) {
+      val stageRows = CarbonShowSegmentsCommand.showStages(tablePath)
+      if (stageRows.nonEmpty) {
+        rows = stageRows.map(
+          stageRow =>
+            SegmentRow (
+              stageRow.getString(0),
+              stageRow.getString(1),
+              stageRow.getString(2),
+              -1,
+              Seq(stageRow.getString(4)),
+              stageRow.getString(5).toLong,
+              stageRow.getString(6).toLong,
+              null,
+              stageRow.getString(7),
+              null,
+              null,
+              null
+            )
+        )
+      }
+    }
+
+    val segments = readSegments(tablePath, showHistory, limit)
     val tempViewName = makeTempViewName(carbonTable)
-    registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments)
+    registerSegmentRowView(sparkSession, tempViewName,
+      carbonTable, segments, rows)
     try {
       sparkSession.sql(query)
     } catch {
@@ -95,11 +122,12 @@ case class CarbonShowSegmentsAsSelectCommand(
       sparkSession: SparkSession,
       tempViewName: String,
       carbonTable: CarbonTable,
-      segments: Array[LoadMetadataDetails]): Unit = {
+      segments: Array[LoadMetadataDetails],
+      rows: Seq[SegmentRow]): Unit = {
 
     // populate a dataframe containing all segment information
     val tablePath = carbonTable.getTablePath
-    val segmentRows = segments.toSeq.map { segment =>
+    val segmentRowView = rows ++ segments.toSeq.map { segment =>
       val mergedToId = CarbonStore.getMergeTo(segment)
       val path = CarbonStore.getExternalSegmentPath(segment)
       val startTime = CarbonStore.getLoadStartTime(segment)
@@ -123,7 +151,7 @@ case class CarbonShowSegmentsAsSelectCommand(
     }
 
     // create a temp view using the populated dataframe and execute the query 
on it
-    val df = sparkSession.createDataFrame(segmentRows)
+    val df = sparkSession.createDataFrame(segmentRowView)
     checkIfTableExist(sparkSession, tempViewName)
     df.createOrReplaceTempView(tempViewName)
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
index 22d0882..f38bb37 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -17,34 +17,40 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.StringType
 
-import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, 
getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments}
+import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, 
getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments, readStages}
 import org.apache.carbondata.common.Strings
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
StageInput}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 
 case class CarbonShowSegmentsCommand(
     databaseNameOp: Option[String],
     tableName: String,
-    limit: Option[String],
-    showHistory: Boolean = false)
+    limit: Option[Int],
+    showHistory: Boolean = false,
+    withStage: Boolean = false)
   extends DataCommand {
 
   // add new columns of show segments at last
   override def output: Seq[Attribute] = {
     Seq(
-      AttributeReference("ID", StringType, nullable = false)(),
+      AttributeReference("ID", StringType, nullable = true)(),
       AttributeReference("Status", StringType, nullable = false)(),
       AttributeReference("Load Start Time", StringType, nullable = false)(),
       AttributeReference("Load Time Taken", StringType, nullable = true)(),
       AttributeReference("Partition", StringType, nullable = true)(),
       AttributeReference("Data Size", StringType, nullable = false)(),
       AttributeReference("Index Size", StringType, nullable = false)(),
-      AttributeReference("File Format", StringType, nullable = false)())
+      AttributeReference("File Format", StringType, nullable = true)())
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -55,12 +61,13 @@ case class CarbonShowSegmentsCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
     val tablePath = carbonTable.getTablePath
-    val segments = readSegments(tablePath, showHistory, limit)
-    if (segments.nonEmpty) {
-      showBasic(segments, tablePath)
-    } else {
-      Seq.empty
+    var rows: Seq[Row] = Seq()
+    if (withStage) {
+      rows = CarbonShowSegmentsCommand.showStages(tablePath)
     }
+
+    val segments = readSegments(tablePath, showHistory, limit)
+    rows ++ showBasic(segments, tablePath)
   }
 
   override protected def opName: String = "SHOW SEGMENTS"
@@ -93,3 +100,87 @@ case class CarbonShowSegmentsCommand(
       }.toSeq
   }
 }
+
+object CarbonShowSegmentsCommand {
+
+  def showStages(tablePath: String): Seq[Row] = {
+    toRows(readStages(tablePath))
+  }
+
+  private def toRows(stages: Seq[StageInput]): Seq[Row] = {
+    var rows = Seq[Row]()
+    stages.foreach(
+      stage =>
+        rows = rows ++ toRows(stage)
+    )
+    rows
+  }
+
+  private def toRows(stage: StageInput): Seq[Row] = {
+    if (stage.getFiles != null) {
+      // Non-partition stage
+      Seq(
+        Row(
+          null,
+          stage.getStatus.toString,
+          new java.sql.Timestamp(stage.getCreateTime).toString,
+          null,
+          "NA",
+          countDataFileSize(stage.getFiles).toString,
+          countIndexFileSize(stage.getFiles).toString,
+          null)
+      )
+    } else {
+      // Partition stage
+      var partitionString: String = ""
+      var dataFileSize: Long = 0
+      var indexFileSize: Long = 0
+      stage.getLocations.asScala.map{
+        location =>
+          val partitions = location.getPartitions.asScala
+          partitionString = if (partitions.size == 1) {
+            partitionString + partitions.head._1 + "=" + partitions.head._2 + 
","
+          } else if (partitions.size > 1) {
+            partitionString + partitions.head._1 + "=" + partitions.head._2 + 
", ..."
+          } else {
+            "NA"
+          }
+          dataFileSize += countDataFileSize(location.getFiles)
+          indexFileSize += countIndexFileSize(location.getFiles)
+      }
+      Seq(Row(
+        null,
+        stage.getStatus.toString,
+        new java.sql.Timestamp(stage.getCreateTime).toString,
+        null,
+        partitionString,
+        dataFileSize.toString,
+        indexFileSize.toString,
+        null))
+    }
+  }
+
+  private def countDataFileSize(files: java.util.Map[java.lang.String, 
java.lang.Long]): Long = {
+    var fileSize: Long = 0
+    import scala.collection.JavaConverters._
+    files.asScala.foreach(
+      file =>
+        if (file._1.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+          fileSize += file._2
+        }
+    )
+    fileSize
+  }
+
+  private def countIndexFileSize(files: java.util.Map[java.lang.String, 
java.lang.Long]): Long = {
+    var fileSize: Long = 0
+    files.asScala.foreach(
+      file =>
+        if (file._1.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+          file._1.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+          fileSize += file._2
+        }
+    )
+    fileSize
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1353919..b101257 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -541,21 +541,23 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
    */
   protected lazy val showSegments: Parser[LogicalPlan] =
     (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~ 
".").? ~ ident ~
-    (LIMIT ~> numericLit).? ~ (AS  ~> restInput).? <~ opt(";") ^^ {
-      case showHistory ~ databaseName ~ tableName ~ limit ~ queryOp =>
+      opt(WITH <~ STAGE) ~ (LIMIT ~> numericLit).? ~ (AS  ~> restInput).? <~ 
opt(";") ^^ {
+      case showHistory ~ databaseName ~ tableName ~ withStage ~ limit ~ 
queryOp =>
         if (queryOp.isEmpty) {
           CarbonShowSegmentsCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
-            limit,
-            showHistory.isDefined)
+            if (limit.isDefined) Some(Integer.valueOf(limit.get)) else None,
+            showHistory.isDefined,
+            withStage.isDefined)
         } else {
           CarbonShowSegmentsAsSelectCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
             queryOp.get,
-            limit,
-            showHistory.isDefined)
+            if (limit.isDefined) Some(Integer.valueOf(limit.get)) else None,
+            showHistory.isDefined,
+            withStage.isDefined)
         }
     }
 

Reply via email to