Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2624#discussion_r210213840
--- Diff:
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.format.MergedBloomIndex;
+import org.apache.carbondata.format.MergedBloomIndexHeader;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
+import org.apache.thrift.TBase;
+
+public class BloomIndexFileStore {
+
+ private static final LogService LOGGER =
+
LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
+
+ /*suffix of original generated file*/
+ public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+ /*suffix of merged bloom index file*/
+ public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
+ /* directory to store merged bloom index files */
+ public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
+ /**
+ * flag file for merging
+ * if flag file exists, query won't use mergeShard
+ * if flag file not exists and mergeShard generated, query will use
mergeShard
+ */
+ public static final String MERGE_INPROGRESS_FILE =
"mergeShard.inprogress";
+
+
+ public static void mergeBloomIndexFile(String dmSegmentPathString,
List<String> indexCols) {
+ // get all shard paths of old store
+ CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
+ FileFactory.getFileType(dmSegmentPathString));
+ CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter()
{
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.isDirectory() &&
!file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME);
+ }
+ });
+
+ String mergeShardPath = dmSegmentPathString + File.separator +
MERGE_BLOOM_INDEX_SHARD_NAME;
+ String mergeInprofressFile = dmSegmentPathString + File.separator +
MERGE_INPROGRESS_FILE;
+ try {
+ // delete mergeShard folder if exists
+ if (FileFactory.isFileExist(mergeShardPath)) {
+ FileFactory.deleteFile(mergeShardPath,
FileFactory.getFileType(mergeShardPath));
+ }
+ // create flag file before creating mergeShard folder
+ if (!FileFactory.isFileExist(mergeInprofressFile)) {
+ FileFactory.createNewFile(
+ mergeInprofressFile,
FileFactory.getFileType(mergeInprofressFile));
+ }
+ // prepare mergeShard output folder
+ if (!FileFactory.mkdirs(mergeShardPath,
FileFactory.getFileType(mergeShardPath))) {
+ throw new RuntimeException("Failed to create directory " +
mergeShardPath);
+ }
+ } catch (IOException e) {
+ LOGGER.error(e, "Error occurs while create directory " +
mergeShardPath);
+ throw new RuntimeException("Error occurs while create directory " +
mergeShardPath);
+ }
+
+ // for each index column, merge the bloomindex files from all shards
into one
+ for (String indexCol: indexCols) {
+ MergedBloomIndexHeader indexHeader = new MergedBloomIndexHeader();
+ MergedBloomIndex mergedBloomIndex = new MergedBloomIndex();
+ List<String> shardNames = new ArrayList<>();
+ List<ByteBuffer> data = new ArrayList<>();
+ try {
+ for (CarbonFile shardPath : shardPaths) {
+ String bloomIndexFile =
getBloomIndexFile(shardPath.getCanonicalPath(), indexCol);
+ DataInputStream dataInputStream = FileFactory.getDataInputStream(
+ bloomIndexFile, FileFactory.getFileType(bloomIndexFile));
+ byte[] bytes = new byte[(int)
FileFactory.getCarbonFile(bloomIndexFile).getSize()];
+ try {
+ dataInputStream.readFully(bytes);
+ shardNames.add(shardPath.getName());
+ data.add(ByteBuffer.wrap(bytes));
+ } finally {
+ dataInputStream.close();
+ }
+ }
+ indexHeader.setShard_names(shardNames);
+ mergedBloomIndex.setFileData(data);
+ // write segment level file
+ String mergeIndexFileName = getMergeBloomIndexFile(mergeShardPath,
indexCol);
+ ThriftWriter thriftWriter = new ThriftWriter(mergeIndexFileName,
false);
--- End diff --
You write the merged bloom index through thrift?
I'd prefer to write it in raw bytes, so that this modification and this
bloomfilter datamap module do not need to depend on carbon-format directly.
---