This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1a0ed65 [CARBONDATA-3843] Support merging index for streaming table
1a0ed65 is described below
commit 1a0ed65270acc6694ed52b13aaddf55bfcfe0422
Author: ajantha-bhat <[email protected]>
AuthorDate: Tue Jun 2 20:00:38 2020 +0530
[CARBONDATA-3843] Support merging index for streaming table
Why is this PR needed?
Merge index is not created for normal segment (created by load, insert,
compaction or handoff) on streaming table.
What changes were proposed in this PR?
For a streaming table other than streaming segment (Row_V1), allow merge
index creation for all kinds of segments.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3785
---
docs/ddl-of-carbondata.md | 3 +-
.../spark/sql/events/MergeIndexEventListener.scala | 134 +++++++++++----------
.../CarbonAlterTableCompactionCommand.scala | 5 -
.../CarbonAlterTableAddHivePartitionCommand.scala | 11 +-
.../CarbonIndexFileMergeTestCase.scala | 64 ++++++++--
5 files changed, 134 insertions(+), 83 deletions(-)
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3165f4e..e7cfb0c 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -751,8 +751,9 @@ Users can specify which columns to include and exclude for
local dictionary gene
```
**NOTE:**
+ * Merge index is supported on streaming table from carbondata 2.0.1
version.
+ But streaming segments (ROW_V1) cannot create merge index.
- * Merge index is not supported on streaming table.
- #### SET and UNSET
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 2995edc..4e06ff0 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -30,9 +30,8 @@ import org.apache.spark.util.MergeIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{FileFormat,
SegmentStatusManager}
import org.apache.carbondata.core.util.{DataLoadMetrics,
ObjectSerializationUtil}
import org.apache.carbondata.events._
import
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
@@ -62,7 +61,8 @@ class MergeIndexEventListener extends OperationEventListener
with Logging {
.asInstanceOf[util.List[String]]
}
val tempPath = operationContext.getProperty("tempPath")
- if(!carbonTable.isStreamingSink) {
+ val loadMetaDetails = loadModel.getCurrentLoadMetadataDetail
+ if (loadMetaDetails != null &&
!loadMetaDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
if (null != compactedSegments && !compactedSegments.isEmpty) {
MergeIndexUtil.mergeIndexFilesForCompactedSegments(sparkSession,
carbonTable,
@@ -104,73 +104,77 @@ class MergeIndexEventListener extends
OperationEventListener with Logging {
case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
val carbonMainTable = alterTableMergeIndexEvent.carbonTable
val sparkSession = alterTableMergeIndexEvent.sparkSession
- if (!carbonMainTable.isStreamingSink) {
- LOGGER.info(s"Merge Index request received for table " +
- s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }")
- val lock = CarbonLockFactory.getCarbonLockObj(
- carbonMainTable.getAbsoluteTableIdentifier,
- LockUsage.COMPACTION_LOCK)
+ LOGGER.info(s"Merge Index request received for table " +
+ s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }")
+ val lock = CarbonLockFactory.getCarbonLockObj(
+ carbonMainTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
- try {
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonMainTable.getDatabaseName }.${
- carbonMainTable
- .getTableName
- }")
- val segmentsToMerge =
- if
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
- val validSegments =
-
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
- val validSegmentIds: mutable.Buffer[String] =
mutable.Buffer[String]()
- validSegments.foreach { segment =>
+ try {
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the compaction lock for table" +
+ s" ${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName}")
+ val loadFolderDetailsArray = SegmentStatusManager
+ .readLoadMetadata(carbonMainTable.getMetadataPath)
+ val segmentFileNameMap: java.util.Map[String, String] = new
util.HashMap[String,
+ String]()
+ var streamingSegment: Set[String] = Set[String]()
+ loadFolderDetailsArray.foreach(loadMetadataDetails => {
+ if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1))
{
+ streamingSegment += loadMetadataDetails.getLoadName
+ }
+ segmentFileNameMap
+ .put(loadMetadataDetails.getLoadName,
+ String.valueOf(loadMetadataDetails.getLoadStartTime))
+ })
+ val segmentsToMerge =
+ if
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+ val validSegments =
+
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+ val validSegmentIds: mutable.Buffer[String] =
mutable.Buffer[String]()
+ validSegments.foreach { segment =>
+ // do not add ROW_V1 format
+ if
(!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
validSegmentIds += segment.getSegmentNo
}
- validSegmentIds
- } else {
-
alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
}
-
- val loadFolderDetailsArray = SegmentStatusManager
- .readLoadMetadata(carbonMainTable.getMetadataPath)
- val segmentFileNameMap: java.util.Map[String, String] = new
util.HashMap[String,
- String]()
- loadFolderDetailsArray.foreach(loadMetadataDetails => {
- segmentFileNameMap
- .put(loadMetadataDetails.getLoadName,
- String.valueOf(loadMetadataDetails.getLoadStartTime))
- })
- // in case of merge index file creation using Alter DDL command
- // readFileFooterFromCarbonDataFile flag should be true. This
flag is check for legacy
- // store (store <= 1.1 version) and create merge Index file as
per new store so that
- // old store is also upgraded to new store
- val startTime = System.currentTimeMillis()
- CarbonMergeFilesRDD.mergeIndexFiles(
- sparkSession = sparkSession,
- segmentIds = segmentsToMerge,
- segmentFileNameToSegmentIdMap = segmentFileNameMap,
- tablePath = carbonMainTable.getTablePath,
- carbonTable = carbonMainTable,
- mergeIndexProperty = true,
- readFileFooterFromCarbonDataFile = true)
- LOGGER.info("Total time taken for merge index "
- + (System.currentTimeMillis() - startTime) + "ms")
- // clear Block index Cache
- MergeIndexUtil.clearBlockIndexCache(carbonMainTable,
segmentsToMerge)
- val requestMessage = "Compaction request completed for table " +
- s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }"
- LOGGER.info(requestMessage)
- } else {
- val lockMessage = "Not able to acquire the compaction lock for
table " +
- s"${ carbonMainTable.getDatabaseName }." +
- s"${ carbonMainTable.getTableName}"
- LOGGER.error(lockMessage)
- CarbonException.analysisException(
- "Table is already locked for compaction. Please try after some
time.")
- }
- } finally {
- lock.unlock()
+ validSegmentIds
+ } else {
+ alterTableMergeIndexEvent.alterTableModel
+ .customSegmentIds
+ .get
+ .filterNot(streamingSegment.contains(_))
+ }
+ // in case of merge index file creation using Alter DDL command
+ // readFileFooterFromCarbonDataFile flag should be true. This flag
is check for legacy
+ // store (store <= 1.1 version) and create merge Index file as per
new store so that
+ // old store is also upgraded to new store
+ val startTime = System.currentTimeMillis()
+ CarbonMergeFilesRDD.mergeIndexFiles(
+ sparkSession = sparkSession,
+ segmentIds = segmentsToMerge,
+ segmentFileNameToSegmentIdMap = segmentFileNameMap,
+ tablePath = carbonMainTable.getTablePath,
+ carbonTable = carbonMainTable,
+ mergeIndexProperty = true,
+ readFileFooterFromCarbonDataFile = true)
+ LOGGER.info("Total time taken for merge index "
+ + (System.currentTimeMillis() - startTime) + "ms")
+ // clear Block index Cache
+ MergeIndexUtil.clearBlockIndexCache(carbonMainTable,
segmentsToMerge)
+ val requestMessage = "Compaction request completed for table " +
+ s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }"
+ LOGGER.info(requestMessage)
+ } else {
+ val lockMessage = "Not able to acquire the compaction lock for
table " +
+ s"${ carbonMainTable.getDatabaseName }." +
+ s"${ carbonMainTable.getTableName}"
+ LOGGER.error(lockMessage)
+ CarbonException.analysisException(
+ "Table is already locked for compaction. Please try after some
time.")
}
+ } finally {
+ lock.unlock()
}
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 2224943..dc50cf5 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -131,11 +131,6 @@ case class CarbonAlterTableCompactionCommand(
}
Seq.empty
} else if (compactionType == CompactionType.SEGMENT_INDEX) {
- if (table.isStreamingSink) {
- throw new MalformedCarbonCommandException(
- "Unsupported alter operation on carbon table: Merge index is not
supported on streaming" +
- " table")
- }
val version = CarbonUtil.getFormatVersion(table)
val isOlderVersion = version == ColumnarFormatVersion.V1 ||
version == ColumnarFormatVersion.V2
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index a080db6..09614a8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -33,7 +33,7 @@ import
org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableMergeIndexEvent,
OperationContext, OperationListenerBus,
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
@@ -160,6 +160,13 @@ case class CarbonAlterTableAddHivePartitionCommand(
// carbon index files, and it is not good for query performance since
all index files
// need to be read to spark driver.
// So, here trigger to merge the index files by sending an event
+ val customSegmentIds = if (loadModel.getCurrentLoadMetadataDetail
+ .getFileFormat
+ .equals(FileFormat.ROW_V1)) {
+ Some(Seq("").toList)
+ } else {
+ Some(Seq(loadModel.getSegmentId).toList)
+ }
val alterTableModel = AlterTableModel(
dbName = Some(table.getDatabaseName),
tableName = table.getTableName,
@@ -167,7 +174,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
compactionType = "", // to trigger index merge, this is not required
factTimeStamp = Some(System.currentTimeMillis()),
alterSql = null,
- customSegmentIds = Some(Seq(loadModel.getSegmentId).toList))
+ customSegmentIds = customSegmentIds)
val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table,
alterTableModel)
OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new
OperationContext)
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index bb2c63f..6a079b5 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -426,14 +426,58 @@ class CarbonIndexFileMergeTestCase
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable
OPTIONS('header'='false')")
- assert(getIndexFileCount("default_streamingTable", "0") >= 1)
- val exceptionMessage = intercept[Exception] {
- sql("alter table streamingTable compact 'segment_index'")
- }.getMessage
- assert(exceptionMessage.contains("Unsupported alter operation on carbon
table: Merge index is not supported on streaming table"))
+ // check for one merge index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
sql("DROP TABLE IF EXISTS streamingTable")
}
+ test("Verify alter table index merge for streaming table") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
"false")
+ sql("DROP TABLE IF EXISTS streamingTable")
+ sql(
+ """
+ | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age
INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable
OPTIONS('header'='false')")
+ // check for zero merge index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+ // check for one index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.INDEX_FILE_EXT) == 1)
+ sql("alter table streamingTable compact 'segment_index'")
+ sql("alter table streamingTable compact 'segment_index' where segment.id
in (0)")
+ // check for one merge index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ sql("DROP TABLE IF EXISTS streamingTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ }
+
+ test("Verify alter table index merge for streaming table with custom
segment") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
"false")
+ sql("DROP TABLE IF EXISTS streamingTable")
+ sql(
+ """
+ | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age
INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable
OPTIONS('header'='false')")
+ // check for zero merge index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+ // check for one index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.INDEX_FILE_EXT) == 1)
+ sql("alter table streamingTable compact 'segment_index' where segment.id
in (0)")
+ // check for one merge index file
+ assert(getIndexFileCount("default_streamingTable", "0",
CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ sql("DROP TABLE IF EXISTS streamingTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ }
+
test("verify driver cache gets updated after creating merge Index file") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
"false")
@@ -471,7 +515,9 @@ class CarbonIndexFileMergeTestCase
identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
}
- private def getIndexFileCount(tableName: String, segment: String): Int = {
+ private def getIndexFileCount(tableName: String,
+ segment: String,
+ extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
val path = CarbonTablePath
.getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
@@ -479,15 +525,13 @@ class CarbonIndexFileMergeTestCase
FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
.listFiles(true, new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
- file.getName.endsWith(CarbonTablePath
- .INDEX_FILE_EXT)
+ file.getName.endsWith(extension)
}
})
} else {
FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
- file.getName.endsWith(CarbonTablePath
- .INDEX_FILE_EXT)
+ file.getName.endsWith(extension)
}
})
}