Repository: carbondata Updated Branches: refs/heads/master 5e3aec43e -> 29bae4d28
[CARBONDATA-1883] Improvement in merge index code 1. Improved merge index code 2. Added trigger point for merge index This closes #1643 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/29bae4d2 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/29bae4d2 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/29bae4d2 Branch: refs/heads/master Commit: 29bae4d28629261b5553a679e11c70bfaee06622 Parents: 5e3aec4 Author: dhatchayani <[email protected]> Authored: Tue Dec 12 12:02:15 2017 +0530 Committer: manishgupta88 <[email protected]> Committed: Wed Dec 13 11:49:55 2017 +0530 ---------------------------------------------------------------------- .../CarbonAlterTableCompactionCommand.scala | 33 +++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/29bae4d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 5fdf62a..55eb5c3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -199,22 +199,33 @@ case class CarbonAlterTableCompactionCommand( carbonTable.getAbsoluteTableIdentifier).asScala, carbonLoadModel.getTablePath, carbonTable, true) - lock.unlock() - return + + // trigger event for merge index + val operationContext = new OperationContext + val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = + AlterTableCompactionPreEvent(sqlContext.sparkSession, + carbonTable, + null, + "") + OperationListenerBus.getInstance + .fireEvent(alterTableCompactionPreEvent, operationContext) + + } else { + CarbonDataRDDFactory.startCompactionThreads( + sqlContext, + carbonLoadModel, + storeLocation, + compactionModel, + lock, + operationContext + ) } - CarbonDataRDDFactory.startCompactionThreads( - sqlContext, - carbonLoadModel, - storeLocation, - compactionModel, - lock, - operationContext - ) } catch { case e: Exception => LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") - lock.unlock() throw e + } finally { + lock.unlock() } } else { LOGGER.audit("Not able to acquire the compaction lock for table " +
