ajantha-bhat commented on a change in pull request #3785:
URL: https://github.com/apache/carbondata/pull/3785#discussion_r457056046
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -104,73 +104,80 @@ class MergeIndexEventListener extends
OperationEventListener with Logging {
case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
val carbonMainTable = alterTableMergeIndexEvent.carbonTable
val sparkSession = alterTableMergeIndexEvent.sparkSession
- if (!carbonMainTable.isStreamingSink) {
- LOGGER.info(s"Merge Index request received for table " +
- s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }")
- val lock = CarbonLockFactory.getCarbonLockObj(
- carbonMainTable.getAbsoluteTableIdentifier,
- LockUsage.COMPACTION_LOCK)
+ LOGGER.info(s"Merge Index request received for table " +
+ s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }")
+ val lock = CarbonLockFactory.getCarbonLockObj(
+ carbonMainTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
- try {
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonMainTable.getDatabaseName }.${
- carbonMainTable
- .getTableName
- }")
- val segmentsToMerge =
- if
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
- val validSegments =
-
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
- val validSegmentIds: mutable.Buffer[String] =
mutable.Buffer[String]()
- validSegments.foreach { segment =>
+ try {
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the compaction lock for table" +
+ s" ${ carbonMainTable.getDatabaseName }.${
+ carbonMainTable
+ .getTableName
+ }")
+ val loadFolderDetailsArray = SegmentStatusManager
+ .readLoadMetadata(carbonMainTable.getMetadataPath)
+ val segmentFileNameMap: java.util.Map[String, String] = new
util.HashMap[String,
+ String]()
+ var streamingSegment: Set[String] = Set[String]()
+ loadFolderDetailsArray.foreach(loadMetadataDetails => {
+ if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1))
{
+ streamingSegment += loadMetadataDetails.getLoadName
+ }
+ segmentFileNameMap
+ .put(loadMetadataDetails.getLoadName,
+ String.valueOf(loadMetadataDetails.getLoadStartTime))
+ })
+ val segmentsToMerge =
+ if
(alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
+ val validSegments =
+
CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+ val validSegmentIds: mutable.Buffer[String] =
mutable.Buffer[String]()
+ validSegments.foreach { segment =>
+ // do not add ROW_V1 format
+ if
(!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) {
validSegmentIds += segment.getSegmentNo
}
- validSegmentIds
- } else {
-
alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get
}
-
- val loadFolderDetailsArray = SegmentStatusManager
- .readLoadMetadata(carbonMainTable.getMetadataPath)
- val segmentFileNameMap: java.util.Map[String, String] = new
util.HashMap[String,
- String]()
- loadFolderDetailsArray.foreach(loadMetadataDetails => {
- segmentFileNameMap
- .put(loadMetadataDetails.getLoadName,
- String.valueOf(loadMetadataDetails.getLoadStartTime))
- })
- // in case of merge index file creation using Alter DDL command
- // readFileFooterFromCarbonDataFile flag should be true. This
flag is check for legacy
- // store (store <= 1.1 version) and create merge Index file as
per new store so that
- // old store is also upgraded to new store
- val startTime = System.currentTimeMillis()
- CarbonMergeFilesRDD.mergeIndexFiles(
- sparkSession = sparkSession,
- segmentIds = segmentsToMerge,
- segmentFileNameToSegmentIdMap = segmentFileNameMap,
- tablePath = carbonMainTable.getTablePath,
- carbonTable = carbonMainTable,
- mergeIndexProperty = true,
- readFileFooterFromCarbonDataFile = true)
- LOGGER.info("Total time taken for merge index "
- + (System.currentTimeMillis() - startTime) + "ms")
- // clear Block index Cache
- MergeIndexUtil.clearBlockIndexCache(carbonMainTable,
segmentsToMerge)
- val requestMessage = "Compaction request completed for table " +
- s"${ carbonMainTable.getDatabaseName }.${
carbonMainTable.getTableName }"
- LOGGER.info(requestMessage)
- } else {
- val lockMessage = "Not able to acquire the compaction lock for
table " +
- s"${ carbonMainTable.getDatabaseName }." +
- s"${ carbonMainTable.getTableName}"
- LOGGER.error(lockMessage)
- CarbonException.analysisException(
- "Table is already locked for compaction. Please try after some
time.")
- }
- } finally {
- lock.unlock()
+ validSegmentIds
+ } else {
+ alterTableMergeIndexEvent.alterTableModel
+ .customSegmentIds
+ .get
+ .filter(!streamingSegment.contains(_))
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]