[CARBONDATA-2223] Adding Listener Support for Partition Adding Listener Support for Partition
This closes #2031 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f5cdd5ca Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f5cdd5ca Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f5cdd5ca Branch: refs/heads/carbonfile Commit: f5cdd5ca9dcf22984ed300fe1d2d36939755e947 Parents: 98b8550 Author: dhatchayani <[email protected]> Authored: Mon Mar 5 15:17:13 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Tue Mar 20 19:24:18 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 3 +++ .../indexstore/BlockletDataMapIndexStore.java | 14 +++++++++----- .../core/metadata/SegmentFileStore.java | 2 +- .../apache/carbondata/core/util/CarbonUtil.java | 18 +++++++++++++++++- .../hadoop/api/CarbonOutputCommitter.java | 4 ---- .../carbondata/events/AlterTableEvents.scala | 2 +- .../spark/rdd/CarbonTableCompactor.scala | 4 +++- .../scala/org/apache/spark/sql/CarbonEnv.scala | 20 ++++++++++++++++++++ .../management/CarbonLoadDataCommand.scala | 7 ++++++- .../sql/test/Spark2TestQueryExecutor.scala | 1 + .../spark/sql/hive/CarbonSessionState.scala | 2 +- .../processing/loading/events/LoadEvents.java | 11 ----------- .../processing/util/CarbonLoaderUtil.java | 6 +++--- 13 files changed, 65 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 1b135dc..33a1884 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1297,6 +1297,9 @@ public final class CarbonCommonConstants { public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname"; + public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME = + "spark.carbon.common.listener.register.classname"; + @CarbonProperty public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT = "carbon.lease.recovery.retry.count"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 53ef496..befa121 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -19,8 +19,10 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.carbondata.common.logging.LogService; @@ -81,8 +83,9 @@ public class BlockletDataMapIndexStore if (dataMap == null) { try { SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + Set<String> filesRead = new HashSet<>(); Map<String, BlockMetaInfo> blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore); + getBlockMetaInfoMap(identifier, indexFileStore, filesRead); dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap); } catch (MemoryException e) { LOGGER.error("memory exception when loading datamap: " + e.getMessage()); @@ -93,13 +96,14 @@ public class BlockletDataMapIndexStore } private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier, - SegmentIndexFileStore indexFileStore) throws IOException { + SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException { if (identifier.getMergeIndexFileName() != null) { CarbonFile indexMergeFile = FileFactory.getCarbonFile( identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier .getMergeIndexFileName()); - if (indexMergeFile.exists()) { + if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); + filesRead.add(indexMergeFile.getPath()); } } if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { @@ -151,10 +155,10 @@ public class BlockletDataMapIndexStore } if (missedIdentifiers.size() > 0) { SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); - + Set<String> filesRead = new HashSet<>(); for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) { Map<String, BlockMetaInfo> blockMetaInfoMap = - getBlockMetaInfoMap(identifier, indexFileStore); + getBlockMetaInfoMap(identifier, indexFileStore, filesRead); blockletDataMaps.add( loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 3fc8ad6..4adc977 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -370,7 +370,7 @@ public class SegmentFileStore { for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { String location = entry.getKey(); if (entry.getValue().isRelative) { - location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; + location = tablePath + location; } if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { for (String indexFile : entry.getValue().getFiles()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b961b60..06511f8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2326,12 +2326,28 @@ public final class CarbonUtil { throws IOException { long carbonDataSize = 0L; long carbonIndexSize = 0L; + List<String> listOfFilesRead = new ArrayList<>(); HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); if (fileStore.getLocationMap() != null) { fileStore.readIndexFiles(); + Map<String, String> indexFiles = fileStore.getIndexFiles(); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { - carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize(); + // get the size of carbonindex file + String indexFile = entry.getKey(); + String mergeIndexFile = indexFiles.get(indexFile); + if (null != mergeIndexFile) { + String mergeIndexPath = indexFile + .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1) + + mergeIndexFile; + if (!listOfFilesRead.contains(mergeIndexPath)) { + carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize(); + listOfFilesRead.add(mergeIndexPath); + } + } else { + carbonIndexSize += FileFactory.getCarbonFile(indexFile).getSize(); + } + // get the size of carbondata files for (String blockFile : entry.getValue()) { carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 7ea11bd..4634b06 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -160,13 +160,9 @@ public class CarbonOutputCommitter extends FileOutputCommitter { if (operationContext != null) { LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent = new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel); - LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent = - new LoadEvents.LoadTableMergePartitionEvent(readPath); try { OperationListenerBus.getInstance() .fireEvent(postStatusUpdateEvent, (OperationContext) operationContext); - OperationListenerBus.getInstance() - .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 671e132..538df4a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -185,7 +185,7 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession, * Compaction Event for handling post update status file operations, like committing child * datamaps in one transaction */ -case class AlterTableCompactionPostStatusUpdateEvent( +case class AlterTableCompactionPostStatusUpdateEvent(sparkSession: SparkSession, carbonTable: CarbonTable, carbonMergerMapping: CarbonMergerMapping, carbonLoadModel: CarbonLoadModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 231b748..a987127 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -246,7 +246,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel, compactionType, segmentFileName) - val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable, + + val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession, + carbonTable, carbonMergerMapping, carbonLoadModel, mergedLoadName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 8c3ca0f..95bbd29 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive.{HiveSessionCatalog, _} +import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -135,6 +136,17 @@ object CarbonEnv { } /** + * Method + * 1. To initialize Listeners to their respective events in the OperationListenerBus + * 2. To register common listeners + * + */ + def init(sparkSession: SparkSession): Unit = { + initListeners + registerCommonListener(sparkSession) + } + + /** * Method to initialize Listeners to their respective events in the OperationListenerBus. */ def initListeners(): Unit = { @@ -158,6 +170,14 @@ object CarbonEnv { .addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener) } + def registerCommonListener(sparkSession: SparkSession): Unit = { + val clsName = sparkSession.sparkContext.conf + .get(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME) + if (null != clsName && !clsName.isEmpty) { + CarbonReflectionUtils.createObject(clsName) + } + } + /** * Return carbon table instance from cache or by looking up table in `sparkSession` */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index eb00ebf..18c268c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -68,7 +68,7 @@ import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -767,6 +767,11 @@ case class CarbonLoadDataCommand( carbonLoadModel, table, operationContext) + + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + new LoadTablePreStatusUpdateEvent(table.getCarbonTableIdentifier, carbonLoadModel) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + } catch { case e: Exception => throw new Exception( http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala index b341d6a..d30e96d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala @@ -67,6 +67,7 @@ object Spark2TestQueryExecutor { .enableHiveSupport() .config("spark.sql.warehouse.dir", warehouse) .config("spark.sql.crossJoin.enabled", "true") + .config(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME, "") .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb) if (warehouse.startsWith("hdfs://")) { System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index ba2fe947..d381144 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -79,7 +79,7 @@ class CarbonSessionCatalog( } // Initialize all listeners to the Operation bus. - CarbonEnv.initListeners() + CarbonEnv.init(sparkSession) /** * This method will invalidate carbonrelation from cache if carbon table is updated in http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java index a3fa292..50ebc34 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -182,15 +182,4 @@ public class LoadEvents { } } - public static class LoadTableMergePartitionEvent extends Event { - private String segmentPath; - - public LoadTableMergePartitionEvent(String segmentPath) { - this.segmentPath = segmentPath; - } - - public String getSegmentPath() { - return segmentPath; - } - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 922a7ee..65827b0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -1103,14 +1103,14 @@ public final class CarbonLoaderUtil { * Merge index files with in the segment of partitioned table * @param segmentId * @param tablePath - * @param uniqueId * @return * @throws IOException */ - public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath, - String uniqueId) throws IOException { + public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath) + throws IOException { CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus = new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath); + String uniqueId = ""; if (segmentIndexFIleMergeStatus != null) { uniqueId = System.currentTimeMillis() + ""; String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
