This is an automated email from the ASF dual-hosted git repository.
liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b36fc3e [CARBONDATA-3856] Support the LIMIT operator for show
segments command
b36fc3e is described below
commit b36fc3ec05abd8a1f631d480a6759d04df5b03f2
Author: haomarch <[email protected]>
AuthorDate: Wed Jun 17 18:44:20 2020 +0800
[CARBONDATA-3856] Support the LIMIT operator for show segments command
Why is this PR needed?
When the number segments is large, "SHOW SEGMENTS" time overhead is too
high. The LIMIT operator shall be supported in the SHOW SEGMENTS command.
What changes were proposed in this PR?
Add LIMIT operator in the grammar parsing and read segments function.
Does this PR introduce any user interface change?
YES
Is any new testcase added?
YES
This closes #3792
---
docs/segment-management-on-carbondata.md | 11 +++++++-
.../org/apache/carbondata/api/CarbonStore.scala | 19 ++++++++++---
.../CarbonShowSegmentsAsSelectCommand.scala | 3 ++-
.../management/CarbonShowSegmentsCommand.scala | 10 +++----
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 6 +++--
.../testsuite/segment/ShowSegmentTestCase.scala | 31 ++++++++++++++++++++--
.../segmentreading/TestSegmentReading.scala | 6 +++--
7 files changed, 68 insertions(+), 18 deletions(-)
diff --git a/docs/segment-management-on-carbondata.md
b/docs/segment-management-on-carbondata.md
index d4fe339..3ef0a3a 100644
--- a/docs/segment-management-on-carbondata.md
+++ b/docs/segment-management-on-carbondata.md
@@ -32,7 +32,7 @@ concept which helps to maintain consistency of data and easy
transaction managem
```
SHOW [HISTORY] SEGMENTS
- [FOR TABLE | ON] [db_name.]table_name
+ [FOR TABLE | ON] [db_name.]table_name [LIMIT number_of_segments]
[AS (select query from table_name_segments)]
```
@@ -54,6 +54,12 @@ concept which helps to maintain consistency of data and easy
transaction managem
SHOW SEGMENTS ON CarbonDatabase.CarbonTable
```
+ Show 10 visible segments with the largest segmentid
+
+ ```
+ SHOW SEGMENTS ON CarbonDatabase.CarbonTable LIMIT 10
+ ```
+
Show all segments, include invisible segments
```
SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable
@@ -83,6 +89,9 @@ concept which helps to maintain consistency of data and easy
transaction managem
SHOW SEGMENTS ON CarbonTable AS
SELECT * FROM CarbonTable_segments
+ SHOW SEGMENTS ON CarbonTable LIMIT 10 AS
+ SELECT * FROM CarbonTable_segments
+
SHOW SEGMENTS ON CarbonTable AS
SELECT id, dataSize FROM CarbonTable_segments
WHERE status='Success'
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index d86f7be..02d36cf 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -42,16 +42,29 @@ import org.apache.carbondata.streaming.segment.StreamSegment
object CarbonStore {
private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def readSegments(tablePath: String, showHistory: Boolean):
Array[LoadMetadataDetails] = {
+ def readSegments(
+ tablePath: String,
+ showHistory: Boolean,
+ limit: Option[String]): Array[LoadMetadataDetails] = {
val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
- val segmentsMetadataDetails = if (showHistory) {
+ var segmentsMetadataDetails = if (showHistory) {
SegmentStatusManager.readLoadMetadata(metaFolder) ++
SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
} else {
SegmentStatusManager.readLoadMetadata(metaFolder)
}
if (!showHistory) {
- segmentsMetadataDetails.filter(_.getVisibility.equalsIgnoreCase("true"))
+ segmentsMetadataDetails = segmentsMetadataDetails
+ .filter(_.getVisibility.equalsIgnoreCase("true"))
+ segmentsMetadataDetails = segmentsMetadataDetails.sortWith { (l1, l2) =>
+ java.lang.Double.parseDouble(l1.getLoadName) >
+ java.lang.Double.parseDouble(l2.getLoadName)
+ }
+ }
+
+ if (limit.isDefined) {
+ val lim = Integer.parseInt(limit.get)
+ segmentsMetadataDetails.slice(0, lim)
} else {
segmentsMetadataDetails
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
index 26772dd..7d1c710 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -35,6 +35,7 @@ case class CarbonShowSegmentsAsSelectCommand(
databaseNameOp: Option[String],
tableName: String,
query: String,
+ limit: Option[String],
showHistory: Boolean = false)
extends DataCommand {
@@ -71,7 +72,7 @@ case class CarbonShowSegmentsAsSelectCommand(
private def createDataFrame: DataFrame = {
val tablePath = carbonTable.getTablePath
- val segments = CarbonStore.readSegments(tablePath, showHistory)
+ val segments = CarbonStore.readSegments(tablePath, showHistory, limit)
val tempViewName = makeTempViewName(carbonTable)
registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments)
try {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
index 53ea021..22d0882 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -30,6 +30,7 @@ import
org.apache.carbondata.core.statusmanager.LoadMetadataDetails
case class CarbonShowSegmentsCommand(
databaseNameOp: Option[String],
tableName: String,
+ limit: Option[String],
showHistory: Boolean = false)
extends DataCommand {
@@ -54,7 +55,7 @@ case class CarbonShowSegmentsCommand(
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
val tablePath = carbonTable.getTablePath
- val segments = readSegments(tablePath, showHistory)
+ val segments = readSegments(tablePath, showHistory, limit)
if (segments.nonEmpty) {
showBasic(segments, tablePath)
} else {
@@ -65,13 +66,8 @@ case class CarbonShowSegmentsCommand(
override protected def opName: String = "SHOW SEGMENTS"
private def showBasic(
- allSegments: Array[LoadMetadataDetails],
+ segments: Array[LoadMetadataDetails],
tablePath: String): Seq[Row] = {
- val segments = allSegments.sortWith { (l1, l2) =>
- java.lang.Double.parseDouble(l1.getLoadName) >
- java.lang.Double.parseDouble(l2.getLoadName)
- }
-
segments
.map { segment =>
val startTime = getLoadStartTime(segment)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 429aac3..1353919 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -541,18 +541,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
*/
protected lazy val showSegments: Parser[LogicalPlan] =
(SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~
".").? ~ ident ~
- (AS ~> restInput).? <~ opt(";") ^^ {
- case showHistory ~ databaseName ~ tableName ~ queryOp =>
+ (LIMIT ~> numericLit).? ~ (AS ~> restInput).? <~ opt(";") ^^ {
+ case showHistory ~ databaseName ~ tableName ~ limit ~ queryOp =>
if (queryOp.isEmpty) {
CarbonShowSegmentsCommand(
CarbonParserUtil.convertDbNameToLowerCase(databaseName),
tableName.toLowerCase(),
+ limit,
showHistory.isDefined)
} else {
CarbonShowSegmentsAsSelectCommand(
CarbonParserUtil.convertDbNameToLowerCase(databaseName),
tableName.toLowerCase(),
queryOp.get,
+ limit,
showHistory.isDefined)
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
index 0fbd39c..27de94f 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
@@ -69,11 +69,22 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
| select id, status, datasize from source_segments where status =
'Success' order by dataSize
|""".stripMargin).collect()
+
assertResult("4.1")(rows(0).get(0))
assertResult("Success")(rows(0).get(1))
assertResult("0.2")(rows(1).get(0))
assertResult("Success")(rows(1).get(1))
+ rows = sql(
+ """
+ | show segments on source limit 2 as
+ | select id, status, datasize from source_segments where status =
'Success' order by dataSize
+ |""".stripMargin).collect()
+
+ assertResult("4.1")(rows(0).get(0))
+ assertResult("Success")(rows(0).get(1))
+ assertResult(1)(rows.length)
+
val tables = sql("show tables").collect()
assert(!tables.toSeq.exists(_.get(1).equals("source_segments")))
@@ -84,7 +95,9 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"""drop TABLE if exists source""").collect
sql(s"""CREATE TABLE source (CUST_ID int,CUST_NAME
String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1
double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect
checkAnswer(sql("show segments on source"), Seq.empty)
- val result = sql("show segments on source as select * from
source_segments").collect()
+ var result = sql("show segments on source as select * from
source_segments").collect()
+ assertResult(0)(result.length)
+ result = sql("show segments on source limit 10 as select * from
source_segments").collect()
assertResult(0)(result.length)
}
@@ -144,6 +157,7 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
assert(historyDetail.length == 0)
sql(s"clean files for table ${tableName}")
assert(sql(s"show segments on ${tableName}").collect().length == 2)
+ assert(sql(s"show segments on ${tableName} limit 1").collect().length == 1)
detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
historyDetail =
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
assert(detail.length == 4)
@@ -163,7 +177,7 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"clean files for table ${tableName}")
assert(sql(s"show segments on ${tableName} as select * from
${tableName}_segments").collect().length == 2)
sql(s"show history segments on ${tableName} as select * from
${tableName}_segments").show(false)
- val segmentsHistoryList = sql(s"show history segments on ${tableName} as
select * from ${tableName}_segments").collect()
+ var segmentsHistoryList = sql(s"show history segments on ${tableName} as
select * from ${tableName}_segments").collect()
assert(segmentsHistoryList.length == 10)
assertResult("0")(segmentsHistoryList(0).getString(0))
assertResult("Compacted")(segmentsHistoryList(0).getString(1))
@@ -183,6 +197,15 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
assertResult("Compacted")(segmentsHistoryList(8).getString(1))
assertResult("4")(segmentsHistoryList(9).getString(0))
assertResult("Compacted")(segmentsHistoryList(9).getString(1))
+
+ segmentsHistoryList = sql(s"show history segments on ${tableName} limit 2
as select * from ${tableName}_segments").collect()
+ assert(segmentsHistoryList.length == 2)
+ assertResult("0")(segmentsHistoryList(0).getString(0))
+ assertResult("Compacted")(segmentsHistoryList(0).getString(1))
+ assertResult("0.1")(segmentsHistoryList(0).getString(7))
+ assertResult("0.2")(segmentsHistoryList(1).getString(0))
+ assertResult("Success")(segmentsHistoryList(1).getString(1))
+
assert(sql(s"show history segments on ${tableName} as select * from
${tableName}_segments limit 3").collect().length == 3)
dropTable(tableName)
}
@@ -191,7 +214,11 @@ class ShowSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists a")
sql("create table a(a string) stored as carbondata")
sql("insert into a select 'k'")
+ sql("insert into a select 'j'")
+ sql("insert into a select 'k'")
val rows = sql("show segments for table a").collect()
+ assert(rows.length == 3)
+ assert(sql(s"show segments for table a limit 1").collect().length == 1)
assert(rows(0).getString(3).replace("S", "").toDouble > 0)
assert(rows(0).getString(7).equalsIgnoreCase("columnar_v3"))
sql("drop table if exists a")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index 8707ec7..4de0761 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -254,10 +254,12 @@ class TestSegmentReading extends QueryTest with
BeforeAndAfterAll {
val col = df.collect().map{
row => Row(row.getString(0),row.getString(1),row.getString(7))
}.toSeq
- assert(col.equals(Seq(Row("0","Compacted","0.1"),
+ assert(col.equals(Seq(
+ Row("2","Success","NA"),
Row("1","Compacted","0.1"),
Row("0.1","Success","NA"),
- Row("2","Success","NA"))))
+ Row("0","Compacted","0.1")
+ )))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")