Github user qiuchenjian commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r243902213
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
---
@@ -24,59 +24,88 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager,
TableDataMap}
import
org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
import org.apache.carbondata.events._
+import
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
class MergeBloomIndexEventListener extends OperationEventListener with
Logging {
val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
override def onEvent(event: Event, operationContext: OperationContext):
Unit = {
+ val sparkSession = SparkSession.getActiveSession.get
event match {
+ case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
+ // For loading process, segment can not be accessed at this time
+ val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
+ val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
+ val segmentId = loadModel.getSegmentId
+
+ // filter out bloom datamap, skip lazy datamap
+ val bloomDatamaps =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
+ .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
+ DataMapClassProvider.BLOOMFILTER.getShortName))
+ .filter(!_.getDataMapSchema.isLazy).toList
+
+ mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps,
Seq(segmentId))
+
+ case compactPreStatusUpdateEvent:
AlterTableCompactionPreStatusUpdateEvent =>
+ // For compact process, segment can not be accessed at this time
+ val carbonTable = compactPreStatusUpdateEvent.carbonTable
+ val mergedLoadName = compactPreStatusUpdateEvent.mergedLoadName
+ val segmentId =
CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+
+ // filter out bloom datamap, skip lazy datamap
+ val bloomDatamaps =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
+ .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
+ DataMapClassProvider.BLOOMFILTER.getShortName))
+ .filter(!_.getDataMapSchema.isLazy).toList
+
+ mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps,
Seq(segmentId))
+
case datamapPostEvent: BuildDataMapPostExecutionEvent =>
- LOGGER.info("Load post status event-listener called for merge
bloom index")
+ // For rebuild datamap process, datamap is disabled when rebuilding
+ if (!datamapPostEvent.isFromRebuild || null ==
datamapPostEvent.dmName) {
+ // ignore datamapPostEvent from loading and compaction
+ return
+ }
+
val carbonTableIdentifier = datamapPostEvent.identifier
val carbonTable =
DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
- val tableDataMaps =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
- val sparkSession = SparkSession.getActiveSession.get
- // filter out bloom datamap
- var bloomDatamaps = tableDataMaps.asScala.filter(
- _.getDataMapSchema.getProviderName.equalsIgnoreCase(
+ // filter out current rebuilt bloom datamap
+ val bloomDatamaps =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
+ .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
DataMapClassProvider.BLOOMFILTER.getShortName))
-
- if (datamapPostEvent.isFromRebuild) {
- if (null != datamapPostEvent.dmName) {
- // for rebuild process
- bloomDatamaps = bloomDatamaps.filter(
-
_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
- }
- } else {
- // for load process, skip lazy datamap
- bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
- }
+
.filter(_.getDataMapSchema.getDataMapName.equalsIgnoreCase(datamapPostEvent.dmName))
+ .toList
val segmentIds = datamapPostEvent.segmentIdList
- if (bloomDatamaps.size > 0 && segmentIds.size > 0) {
- // we extract bloom datamap name and index columns here
- // because TableDataMap is not serializable
- val bloomDMnames = ListBuffer.empty[String]
- val bloomIndexColumns = ListBuffer.empty[Seq[String]]
- bloomDatamaps.foreach( dm => {
- bloomDMnames += dm.getDataMapSchema.getDataMapName
- bloomIndexColumns +=
dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
- })
- new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
- segmentIds, bloomDMnames, bloomIndexColumns).collect()
- }
+ mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps,
segmentIds)
}
}
- private def clearBloomCache(carbonTable: CarbonTable, segmentIds:
Seq[String]): Unit = {
-
DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName)
+ private def mergeBloomIndex(sparkSession: SparkSession, carbonTable:
CarbonTable,
+ bloomDatamaps: List[TableDataMap], segmentIds: Seq[String]) = {
+ if (bloomDatamaps.nonEmpty && segmentIds.nonEmpty) {
+ // we extract bloom datamap name and index columns here
+ // because TableDataMap is not serializable
+ val bloomDMnames = ListBuffer.empty[String]
+ val bloomIndexColumns = ListBuffer.empty[Seq[String]]
+ bloomDatamaps.foreach(dm => {
+ bloomDMnames += dm.getDataMapSchema.getDataMapName
+ bloomIndexColumns +=
dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
+ })
+ LOGGER.info("Start to merge bloom index file")
--- End diff --
it's better to add the key message on the three logs, such as tablename ,
segmentids,bloomDatamaps
---