Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2624#discussion_r210214357
--- Diff:
integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.Partition
+import org.apache.spark.rdd.CarbonMergeFilePartition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.datamap.bloom.BloomIndexFileStore
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+
+/**
+ * RDD to merge all bloomindex files of each segment for bloom datamap.
+ *
+ * @param sc
+ * @param carbonTable
+ * @param segmentIds segments to be merged
+ * @param bloomDatamapNames list of bloom datamap
+ * @param bloomIndexColumns list of index columns correspond to datamap
+ */
+class CarbonMergeBloomIndexFilesRDD(
+ sc: SparkContext,
+ carbonTable: CarbonTable,
+ segmentIds: Seq[String],
+ bloomDatamapNames: Seq[String],
+ bloomIndexColumns: Seq[Seq[String]])
+ extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
+
+ override def getPartitions: Array[Partition] = {
+ segmentIds.zipWithIndex.map {s =>
+ CarbonMergeFilePartition(id, s._2, s._1)
+ }.toArray
+ }
+
+ override def internalCompute(theSplit: Partition, context: TaskContext):
Iterator[String] = {
+ val tablePath = carbonTable.getTablePath
+ val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
+ logInfo("Merging bloom index files of segment : " + split.segmentId)
--- End diff --
s"Merging bloom index files of segment ${SEG_ID} for ${TABLE}
---