Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2624#discussion_r210213202
--- Diff:
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.format.MergedBloomIndex;
+import org.apache.carbondata.format.MergedBloomIndexHeader;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
+import org.apache.thrift.TBase;
+
+public class BloomIndexFileStore {
+
+ private static final LogService LOGGER =
+
LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
+
+ /*suffix of original generated file*/
+ public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+ /*suffix of merged bloom index file*/
+ public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
+ /* directory to store merged bloom index files */
+ public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
+ /**
+ * flag file for merging
+ * if flag file exists, query won't use mergeShard
+ * if flag file not exists and mergeShard generated, query will use
mergeShard
+ */
+ public static final String MERGE_INPROGRESS_FILE =
"mergeShard.inprogress";
+
+
+ public static void mergeBloomIndexFile(String dmSegmentPathString,
List<String> indexCols) {
+ // get all shard paths of old store
+ CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
+ FileFactory.getFileType(dmSegmentPathString));
+ CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter()
{
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.isDirectory() &&
!file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME);
+ }
+ });
+
+ String mergeShardPath = dmSegmentPathString + File.separator +
MERGE_BLOOM_INDEX_SHARD_NAME;
+ String mergeInprofressFile = dmSegmentPathString + File.separator +
MERGE_INPROGRESS_FILE;
+ try {
+ // delete mergeShard folder if exists
+ if (FileFactory.isFileExist(mergeShardPath)) {
+ FileFactory.deleteFile(mergeShardPath,
FileFactory.getFileType(mergeShardPath));
+ }
+ // create flag file before creating mergeShard folder
+ if (!FileFactory.isFileExist(mergeInprofressFile)) {
+ FileFactory.createNewFile(
+ mergeInprofressFile,
FileFactory.getFileType(mergeInprofressFile));
+ }
+ // prepare mergeShard output folder
+ if (!FileFactory.mkdirs(mergeShardPath,
FileFactory.getFileType(mergeShardPath))) {
+ throw new RuntimeException("Failed to create directory " +
mergeShardPath);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, "Error occurs while create directory " +
mergeShardPath);
+ throw new RuntimeException("Error occurs while create directory " +
mergeShardPath);
+ }
+
+ // for each index column, merge the bloomindex files from all shards
into one
+ for (String indexCol: indexCols) {
+ MergedBloomIndexHeader indexHeader = new MergedBloomIndexHeader();
+ MergedBloomIndex mergedBloomIndex = new MergedBloomIndex();
+ List<String> shardNames = new ArrayList<>();
+ List<ByteBuffer> data = new ArrayList<>();
+ try {
+ for (CarbonFile shardPath : shardPaths) {
+ String bloomIndexFile =
getBloomIndexFile(shardPath.getCanonicalPath(), indexCol);
+ DataInputStream dataInputStream = FileFactory.getDataInputStream(
+ bloomIndexFile, FileFactory.getFileType(bloomIndexFile));
+ byte[] bytes = new byte[(int)
FileFactory.getCarbonFile(bloomIndexFile).getSize()];
+ try {
+ dataInputStream.readFully(bytes);
+ shardNames.add(shardPath.getName());
+ data.add(ByteBuffer.wrap(bytes));
+ } finally {
+ dataInputStream.close();
+ }
+ }
+ indexHeader.setShard_names(shardNames);
+ mergedBloomIndex.setFileData(data);
+ // write segment level file
+ String mergeIndexFileName = getMergeBloomIndexFile(mergeShardPath,
indexCol);
+ ThriftWriter thriftWriter = new ThriftWriter(mergeIndexFileName,
false);
+ thriftWriter.open(FileWriteOperation.OVERWRITE);
+ thriftWriter.write(indexHeader);
+ thriftWriter.write(mergedBloomIndex);
+ thriftWriter.close();
+ } catch (IOException e) {
+ LOGGER.error(e, "Error occurs while merge bloom index file of
column: " + indexCol);
+ // delete merge shard of bloom index for this segment when failed
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
+ throw new RuntimeException(
+ "Error occurs while merge bloom index file of column: " +
indexCol);
+ }
+ }
+ // delete flag file and mergeShard can be used
+ try {
+ FileFactory.deleteFile(mergeInprofressFile,
FileFactory.getFileType(mergeInprofressFile));
+ } catch (IOException e) {
+ LOGGER.error(e, "Error occurs while deleting file " +
mergeInprofressFile);
+ throw new RuntimeException("Error occurs while deleting file " +
mergeInprofressFile);
+ }
+ // remove old store
+ for (CarbonFile shardpath: shardPaths) {
+ FileFactory.deleteAllCarbonFilesOfDir(shardpath);
+ }
+ }
+
+ /**
+ * load bloom filter from bloom index file
+ * @param shardPath
--- End diff --
remove these useless lines if not provide more information
---