This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch bloomfilter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d797dff895417f75b23c85281b37ab3ae1880be5 Author: kishoreg <[email protected]> AuthorDate: Mon Nov 19 18:29:39 2018 -0800 Adding support for bloom filter --- .../pinot/common/config/IndexingConfig.java | 12 ++ .../linkedin/pinot/common/config/TableConfig.java | 7 + .../pinot/common/segment/SegmentMetadata.java | 3 + .../com/linkedin/pinot/core/common/DataSource.java | 3 + .../indexsegment/mutable/MutableSegmentImpl.java | 4 +- .../core/query/pruner/AbstractSegmentPruner.java | 13 +- .../query/pruner/ColumnValueSegmentPruner.java | 44 ++++-- .../core/query/pruner/PartitionSegmentPruner.java | 10 +- .../core/segment/creator/impl/V1Constants.java | 1 + .../creator/impl/bloom/BloomFilterCreator.java | 51 +++++++ .../core/segment/index/SegmentMetadataImpl.java | 5 + .../segment/index/column/ColumnIndexContainer.java | 3 + .../index/column/PhysicalColumnIndexContainer.java | 23 ++- .../index/data/source/ColumnDataSource.java | 19 ++- .../segment/index/loader/IndexLoadingConfig.java | 17 +++ .../segment/index/loader/SegmentPreProcessor.java | 6 +- .../loader/bloomfilter/BloomFilterHandler.java | 158 +++++++++++++++++++++ .../segment/index/readers/BloomFilterReader.java | 32 +++++ .../core/segment/store/ColumnIndexDirectory.java | 18 ++- .../pinot/core/segment/store/ColumnIndexType.java | 3 +- .../core/segment/store/FilePerIndexDirectory.java | 17 +++ .../segment/store/SegmentLocalFSDirectory.java | 6 + .../segment/store/SingleFileIndexDirectory.java | 12 ++ .../virtualcolumn/VirtualColumnIndexContainer.java | 6 + .../v2/store/StarTreeDimensionDataSource.java | 6 + .../v2/store/StarTreeMetricDataSource.java | 8 ++ .../core/common/RealtimeNoDictionaryTest.java | 8 +- .../index/creator/BloomFilterCreatorTest.java | 71 +++++++++ .../query/pruner/ColumnValueSegmentPrunerTest.java | 4 +- .../pinot/tools/perf/PerfBenchmarkDriver.java | 6 +- .../pinot/tools/perf/PerfBenchmarkRunner.java | 16 ++- pom.xml | 1 + 32 files changed, 552 insertions(+), 41 deletions(-) diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java index b57825c..35de1e3 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/IndexingConfig.java @@ -43,6 +43,9 @@ public class IndexingConfig { @ConfigKey("sortedColumn") private List<String> _sortedColumn = new ArrayList<>(); + @ConfigKey("bloomFilterColumns") + private List<String> _bloomFilterColumns = new ArrayList<>(); + @ConfigKey("loadMode") private String _loadMode; @@ -113,6 +116,15 @@ public class IndexingConfig { _sortedColumn = sortedColumn; } + + public List<String> getBloomFilterColumns() { + return _bloomFilterColumns; + } + + public void setBloomFilterColumns(List<String> _bloomFilterColumns) { + this._bloomFilterColumns = _bloomFilterColumns; + } + public String getLoadMode() { return _loadMode; } diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java index 960c7f0..373f369 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java @@ -394,6 +394,7 @@ public class TableConfig { private List<String> _invertedIndexColumns; private List<String> _noDictionaryColumns; private List<String> _onHeapDictionaryColumns; + private List<String> _bloomFilterColumns; private Map<String, String> _streamConfigs; private String _streamPartitionAssignmentStrategy = DEFAULT_STREAM_PARTITION_ASSIGNMENT_STRATEGY; @@ -508,6 +509,11 @@ public class TableConfig { return this; } + public Builder setBloomFilterColumns(List<String> bloomFilterColumns) { + _bloomFilterColumns = bloomFilterColumns; + return this; + } + public Builder setNoDictionaryColumns(List<String> noDictionaryColumns) { _noDictionaryColumns = noDictionaryColumns; return this; @@ -583,6 +589,7 @@ public class TableConfig { indexingConfig.setNoDictionaryColumns(_noDictionaryColumns); indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns); indexingConfig.setStreamConfigs(_streamConfigs); + indexingConfig.setBloomFilterColumns(_bloomFilterColumns); StreamConsumptionConfig streamConsumptionConfig = new StreamConsumptionConfig(); streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy); indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig); diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java index 3a2d1bb..aaf53fe 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/segment/SegmentMetadata.java @@ -82,6 +82,8 @@ public interface SegmentMetadata { String getBitmapInvertedIndexFileName(String column); + String getBloomFilterFileName(String column); + String getCreatorName(); char getPaddingCharacter(); @@ -99,4 +101,5 @@ public interface SegmentMetadata { String getDerivedColumn(String column, MetricFieldSpec.DerivedMetricType derivedMetricType); boolean close(); + } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java index b44d721..6397024 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/DataSource.java @@ -16,6 +16,7 @@ package com.linkedin.pinot.core.common; import com.linkedin.pinot.core.operator.BaseOperator; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; @@ -26,4 +27,6 @@ public abstract class DataSource extends BaseOperator { public abstract InvertedIndexReader getInvertedIndex(); public abstract Dictionary getDictionary(); + + public abstract BloomFilterReader getBloomFilter(); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index a32068a..7c356a0 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -35,6 +35,7 @@ import com.linkedin.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndex import com.linkedin.pinot.core.segment.creator.impl.V1Constants; import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; import com.linkedin.pinot.core.segment.index.data.source.ColumnDataSource; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnContext; import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProvider; import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory; @@ -79,6 +80,7 @@ public class MutableSegmentImpl implements MutableSegment { private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>(); private final Map<String, Integer> _maxNumValuesMap = new HashMap<>(); private final Map<String, RealtimeInvertedIndexReader> _invertedIndexMap = new HashMap<>(); + private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>(); private final IdMap<FixedIntArray> _recordIdMap; private boolean _aggregateMetrics; @@ -386,7 +388,7 @@ public class MutableSegmentImpl implements MutableSegment { public ColumnDataSource getDataSource(String columnName) { if (!_schema.isVirtualColumn(columnName)) { return new ColumnDataSource(_schema.getFieldSpecFor(columnName), _numDocsIndexed, _maxNumValuesMap.get(columnName), - _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName)); + _indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName), _dictionaryMap.get(columnName), _bloomFilterMap.get(columnName)); } else { return getVirtualDataSource(columnName); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java index 258fae9..b23c438 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/AbstractSegmentPruner.java @@ -22,6 +22,8 @@ import com.linkedin.pinot.common.request.FilterOperator; import com.linkedin.pinot.common.utils.request.FilterQueryTree; import com.linkedin.pinot.core.query.exception.BadQueryRequestException; import com.linkedin.pinot.core.segment.index.ColumnMetadata; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; + import javax.annotation.Nonnull; @@ -32,7 +34,8 @@ import javax.annotation.Nonnull; */ public abstract class AbstractSegmentPruner implements SegmentPruner { - public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap); + public abstract boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap, + Map<String, BloomFilterReader> bloomFilterMap); /** * Given a non leaf filter query tree node prunes it as follows: @@ -46,8 +49,8 @@ public abstract class AbstractSegmentPruner implements SegmentPruner { * * @return True to prune, false otherwise */ - protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree, - @Nonnull Map<String, ColumnMetadata> columnMetadataMap) { + protected boolean pruneNonLeaf(@Nonnull FilterQueryTree filterQueryTree, @Nonnull Map<String, ColumnMetadata> columnMetadataMap, + Map<String, BloomFilterReader> bloomFilterMap) { List<FilterQueryTree> children = filterQueryTree.getChildren(); if (children.isEmpty()) { @@ -58,7 +61,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner { switch (filterOperator) { case AND: for (FilterQueryTree child : children) { - if (pruneSegment(child, columnMetadataMap)) { + if (pruneSegment(child, columnMetadataMap, bloomFilterMap)) { return true; } } @@ -66,7 +69,7 @@ public abstract class AbstractSegmentPruner implements SegmentPruner { case OR: for (FilterQueryTree child : children) { - if (!pruneSegment(child, columnMetadataMap)) { + if (!pruneSegment(child, columnMetadataMap, bloomFilterMap)) { return false; } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java index a2953ce..40c34f7 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/ColumnValueSegmentPruner.java @@ -23,8 +23,13 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment; import com.linkedin.pinot.core.query.request.ServerQueryRequest; import com.linkedin.pinot.core.segment.index.ColumnMetadata; import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; + +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import javax.annotation.Nonnull; import org.apache.commons.configuration.Configuration; @@ -48,7 +53,15 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner { // For realtime segment, this map can be null. Map<String, ColumnMetadata> columnMetadataMap = ((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap(); - return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap); + + Map<String, BloomFilterReader> bloomFilterMap = new HashMap<>(); + for (String column : columnMetadataMap.keySet()) { + BloomFilterReader bloomFilterReader = segment.getDataSource(column).getBloomFilter(); + if (bloomFilterReader != null) { + bloomFilterMap.put(column, bloomFilterReader); + } + } + return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, bloomFilterMap); } @Override @@ -74,7 +87,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner { @SuppressWarnings("unchecked") @Override public boolean pruneSegment(@Nonnull FilterQueryTree filterQueryTree, - @Nonnull Map<String, ColumnMetadata> columnMetadataMap) { + @Nonnull Map<String, ColumnMetadata> columnMetadataMap, Map<String, BloomFilterReader> bloomFilterMap) { FilterOperator filterOperator = filterQueryTree.getOperator(); List<FilterQueryTree> children = filterQueryTree.getChildren(); @@ -86,7 +99,8 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner { return false; } - ColumnMetadata columnMetadata = columnMetadataMap.get(filterQueryTree.getColumn()); + String column = filterQueryTree.getColumn(); + ColumnMetadata columnMetadata = columnMetadataMap.get(column); if (columnMetadata == null) { // Should not reach here after DataSchemaSegmentPruner return true; @@ -97,16 +111,22 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner { if (filterOperator == FilterOperator.EQUALITY) { // EQUALITY - - // Doesn't have min/max value set in metadata - if ((minValue == null) || (maxValue == null)) { - return false; - } - - // Check if the value is in the min/max range + boolean prune = false; FieldSpec.DataType dataType = columnMetadata.getDataType(); Comparable value = getValue(filterQueryTree.getValue().get(0), dataType); - return (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0); + // Doesn't have min/max value set in metadata + if (minValue != null && maxValue != null) { + // Check if the value is in the min/max range + prune = (value.compareTo(minValue) < 0) || (value.compareTo(maxValue) > 0); + } + //check bloom filter if it exists + if( !prune && bloomFilterMap.containsKey(column)) { + BloomFilterReader bloomFilterReader = bloomFilterMap.get(column); + if(bloomFilterReader != null){ + prune = !bloomFilterReader.mightContain(value); + } + } + return prune; } else { // RANGE @@ -170,7 +190,7 @@ public class ColumnValueSegmentPruner extends AbstractSegmentPruner { } } else { // Parent node - return pruneNonLeaf(filterQueryTree, columnMetadataMap); + return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap); } } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java index f8e1cd9..6846fae 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/query/pruner/PartitionSegmentPruner.java @@ -22,6 +22,9 @@ import com.linkedin.pinot.core.indexsegment.IndexSegment; import com.linkedin.pinot.core.query.request.ServerQueryRequest; import com.linkedin.pinot.core.segment.index.ColumnMetadata; import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; + +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.configuration.Configuration; @@ -59,7 +62,8 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner { Map<String, ColumnMetadata> columnMetadataMap = ((SegmentMetadataImpl) segment.getSegmentMetadata()).getColumnMetadataMap(); - return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap); + Map<String, BloomFilterReader> emptyMap = Collections.emptyMap(); + return (columnMetadataMap != null) && pruneSegment(filterQueryTree, columnMetadataMap, emptyMap); } /** @@ -74,12 +78,12 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner { * @return True if segment can be pruned, false otherwise */ @Override - public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap) { + public boolean pruneSegment(FilterQueryTree filterQueryTree, Map<String, ColumnMetadata> columnMetadataMap, Map<String, BloomFilterReader> bloomFilterMap) { List<FilterQueryTree> children = filterQueryTree.getChildren(); // Non-leaf node if (children != null && !children.isEmpty()) { - return pruneNonLeaf(filterQueryTree, columnMetadataMap); + return pruneNonLeaf(filterQueryTree, columnMetadataMap, bloomFilterMap); } // TODO: Enhance partition based pruning for RANGE operator. diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java index 772d5a3..c3facda 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/V1Constants.java @@ -57,6 +57,7 @@ public class V1Constants { public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; + public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom"; } public static class MetadataKeys { diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java new file mode 100644 index 0000000..64579b0 --- /dev/null +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.core.segment.creator.impl.bloom; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; + +import com.clearspring.analytics.stream.membership.BloomFilter; +import com.linkedin.pinot.common.data.FieldSpec; +import com.linkedin.pinot.core.segment.creator.impl.V1Constants; + +public class BloomFilterCreator implements AutoCloseable { + + BloomFilter _bloomFilter; + File _bloomFilterFile; + + public BloomFilterCreator(File indexDir, FieldSpec fieldSpec, int cardinality, int numDocs, int totalNumberOfEntries) { + String columnName = fieldSpec.getName(); + _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); + _bloomFilter = new BloomFilter(cardinality, .03); + + } + + @Override + public void close() throws Exception { + byte[] buffer = BloomFilter.serialize(_bloomFilter); + try (DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(_bloomFilterFile)))) { + dataOutputStream.write(buffer); + } + } + + public void add(Object input) { + _bloomFilter.add(input.toString()); + } + +} diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java index 4b1c8af..5d8f164 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/SegmentMetadataImpl.java @@ -555,6 +555,11 @@ public class SegmentMetadataImpl implements SegmentMetadata { return column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION; } + @Override + public String getBloomFilterFileName(String column) { + return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION; + } + @Nullable @Override public String getCreatorName() { diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java index 8850ed2..6e4e515 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/ColumnIndexContainer.java @@ -16,6 +16,7 @@ package com.linkedin.pinot.core.segment.index.column; import com.linkedin.pinot.core.io.reader.DataFileReader; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; @@ -39,4 +40,6 @@ public interface ColumnIndexContainer { * Returns the dictionary for the column, or {@code null} if it does not exist. */ Dictionary getDictionary(); + + BloomFilterReader getBloomFilter(); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index fb78751..2af188b 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -27,6 +27,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import com.linkedin.pinot.core.segment.index.ColumnMetadata; import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig; import com.linkedin.pinot.core.segment.index.readers.BitmapInvertedIndexReader; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.BytesDictionary; import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary; import com.linkedin.pinot.core.segment.index.readers.FloatDictionary; @@ -44,6 +45,7 @@ import com.linkedin.pinot.core.segment.memory.PinotDataBuffer; import com.linkedin.pinot.core.segment.store.ColumnIndexType; import com.linkedin.pinot.core.segment.store.SegmentDirectory; import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +56,29 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final DataFileReader _forwardIndex; private final InvertedIndexReader _invertedIndex; private final ImmutableDictionaryReader _dictionary; - + private final BloomFilterReader _bloomFilterReader; + public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata, IndexLoadingConfig indexLoadingConfig) throws IOException { String columnName = metadata.getColumnName(); boolean loadInvertedIndex = false; boolean loadOnHeapDictionary = false; + boolean loadBloomFilter = false; if (indexLoadingConfig != null) { loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName); loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName); + loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName); } PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX); + if (metadata.hasDictionary()) { + //bloom filter + if (loadBloomFilter) { + PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER); + _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer, metadata.getDataType()); + } else { + _bloomFilterReader = null; + } // Dictionary-based index _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata, loadOnHeapDictionary); @@ -100,6 +113,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); _invertedIndex = null; _dictionary = null; + _bloomFilterReader = null; } } @@ -117,6 +131,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer public ImmutableDictionaryReader getDictionary() { return _dictionary; } + + @Override + public BloomFilterReader getBloomFilter() { + return _bloomFilterReader; + } + private static ImmutableDictionaryReader loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata, boolean loadOnHeap) { @@ -175,4 +195,5 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer throw new IllegalStateException("Illegal data type for raw forward index: " + dataType); } } + } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java index 16c79f4..c8e6af3 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/data/source/ColumnDataSource.java @@ -30,6 +30,7 @@ import com.linkedin.pinot.core.operator.blocks.SingleValueBlock; import com.linkedin.pinot.core.realtime.impl.dictionary.MutableDictionary; import com.linkedin.pinot.core.segment.index.ColumnMetadata; import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; @@ -44,6 +45,7 @@ public final class ColumnDataSource extends DataSource { private final DataFileReader _forwardIndex; private final InvertedIndexReader _invertedIndex; private final Dictionary _dictionary; + private final BloomFilterReader _bloomFilter; private final int _cardinality; private final DataSourceMetadata _metadata; @@ -53,21 +55,22 @@ public final class ColumnDataSource extends DataSource { public ColumnDataSource(ColumnIndexContainer indexContainer, ColumnMetadata metadata) { this(metadata.getColumnName(), metadata.getDataType(), metadata.isSingleValue(), metadata.isSorted(), metadata.getTotalDocs(), metadata.getMaxNumberOfMultiValues(), indexContainer.getForwardIndex(), - indexContainer.getInvertedIndex(), indexContainer.getDictionary(), metadata.getCardinality()); + indexContainer.getInvertedIndex(), indexContainer.getDictionary(), indexContainer.getBloomFilter(), + metadata.getCardinality()); } /** * For REALTIME segment. */ public ColumnDataSource(FieldSpec fieldSpec, int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, - InvertedIndexReader invertedIndex, MutableDictionary dictionary) { + InvertedIndexReader invertedIndex, MutableDictionary dictionary, BloomFilterReader bloomFilter) { this(fieldSpec.getName(), fieldSpec.getDataType(), fieldSpec.isSingleValueField(), false, numDocs, - maxNumMultiValues, forwardIndex, invertedIndex, dictionary, Constants.UNKNOWN_CARDINALITY); + maxNumMultiValues, forwardIndex, invertedIndex, dictionary, bloomFilter, Constants.UNKNOWN_CARDINALITY); } private ColumnDataSource(String columnName, FieldSpec.DataType dataType, boolean isSingleValue, boolean isSorted, - int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, InvertedIndexReader invertedIndex, - Dictionary dictionary, int cardinality) { + int numDocs, int maxNumMultiValues, DataFileReader forwardIndex, InvertedIndexReader invertedIndex, + Dictionary dictionary, BloomFilterReader bloomFilterReader, int cardinality) { // Sanity check if (isSingleValue) { Preconditions.checkState(forwardIndex instanceof SingleColumnSingleValueReader); @@ -93,6 +96,7 @@ public final class ColumnDataSource extends DataSource { _forwardIndex = forwardIndex; _invertedIndex = invertedIndex; _dictionary = dictionary; + _bloomFilter = bloomFilterReader; _cardinality = cardinality; _metadata = new DataSourceMetadata() { @@ -154,6 +158,11 @@ public final class ColumnDataSource extends DataSource { } @Override + public BloomFilterReader getBloomFilter() { + return _bloomFilter; + } + + @Override protected Block getNextBlock() { if (_isSingleValue) { return new SingleValueBlock((SingleColumnSingleValueReader) _forwardIndex, _numDocs, _dataType, _dictionary); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java index 6381014..7c54c90 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -44,6 +44,8 @@ public class IndexLoadingConfig { private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig. private Map<String, String> _noDictionaryConfig = new HashMap<>(); private Set<String> _onHeapDictionaryColumns = new HashSet<>(); + private Set<String> _bloomFilterColumns = new HashSet<>(); + private SegmentVersion _segmentVersion; // This value will remain true only when the empty constructor is invoked. private boolean _enableDefaultColumns = true; @@ -76,6 +78,11 @@ public class IndexLoadingConfig { _invertedIndexColumns.addAll(invertedIndexColumns); } + List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns(); + if (bloomFilterColumns != null) { + _bloomFilterColumns.addAll(bloomFilterColumns); + } + List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns(); if (noDictionaryColumns != null) { _noDictionaryColumns.addAll(noDictionaryColumns); @@ -164,6 +171,12 @@ public class IndexLoadingConfig { } @VisibleForTesting + public void setBloomFilterColumns(@Nonnull Set<String> bloomFilterColumns) { + _bloomFilterColumns = bloomFilterColumns; + } + + + @VisibleForTesting public void setOnHeapDictionaryColumns(@Nonnull Set<String> onHeapDictionaryColumns) { _onHeapDictionaryColumns = onHeapDictionaryColumns; } @@ -183,6 +196,10 @@ public class IndexLoadingConfig { return _onHeapDictionaryColumns; } + public Set<String> getBloomFilterColumns() { + return _bloomFilterColumns; + } + @Nullable public SegmentVersion getSegmentVersion() { return _segmentVersion; diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java index 6cf064d..93d9f15 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/SegmentPreProcessor.java @@ -19,6 +19,7 @@ import com.linkedin.pinot.common.data.Schema; import com.linkedin.pinot.common.segment.ReadMode; import com.linkedin.pinot.core.segment.creator.impl.V1Constants; import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; +import com.linkedin.pinot.core.segment.index.loader.bloomfilter.BloomFilterHandler; import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGenerator; import com.linkedin.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode; import com.linkedin.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler; @@ -63,7 +64,6 @@ public class SegmentPreProcessor implements AutoCloseable { if (_segmentMetadata.getTotalDocs() == 0) { return; } - // Remove all the existing inverted index temp files before loading segments. // NOTE: This step fixes the issue of temporary files not getting deleted after creating new inverted indexes. // In this, we look for all files in the directory and remove the ones with '.bitmap.inv.tmp' extension. @@ -91,6 +91,10 @@ public class SegmentPreProcessor implements AutoCloseable { new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); invertedIndexHandler.createInvertedIndices(); + BloomFilterHandler bloomFilterHandler = + new BloomFilterHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter); + bloomFilterHandler.createBloomFilters(); + // Add min/max value to column metadata according to the prune mode. // For star-tree index, because it can only increase the range, so min/max value can still be used in pruner. ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode = diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java new file mode 100644 index 0000000..d5494b8 --- /dev/null +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java @@ -0,0 +1,158 @@ +package com.linkedin.pinot.core.segment.index.loader.bloomfilter; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import javax.annotation.Nonnull; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.pinot.common.data.FieldSpec; +import com.linkedin.pinot.common.data.FieldSpec.DataType; +import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion; +import com.linkedin.pinot.core.io.reader.DataFileReader; +import com.linkedin.pinot.core.io.reader.ReaderContext; +import com.linkedin.pinot.core.io.reader.SingleColumnSingleValueReader; +import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader; +import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader; +import com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader; +import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; +import com.linkedin.pinot.core.segment.creator.impl.V1Constants; +import com.linkedin.pinot.core.segment.creator.impl.bloom.BloomFilterCreator; +import com.linkedin.pinot.core.segment.index.ColumnMetadata; +import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; +import com.linkedin.pinot.core.segment.index.loader.IndexLoadingConfig; +import com.linkedin.pinot.core.segment.index.loader.LoaderUtils; +import com.linkedin.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler; +import com.linkedin.pinot.core.segment.index.readers.DoubleDictionary; +import com.linkedin.pinot.core.segment.index.readers.FloatDictionary; +import com.linkedin.pinot.core.segment.index.readers.ImmutableDictionaryReader; +import com.linkedin.pinot.core.segment.index.readers.IntDictionary; +import com.linkedin.pinot.core.segment.index.readers.LongDictionary; +import com.linkedin.pinot.core.segment.index.readers.StringDictionary; +import com.linkedin.pinot.core.segment.memory.PinotDataBuffer; +import com.linkedin.pinot.core.segment.store.ColumnIndexType; +import com.linkedin.pinot.core.segment.store.SegmentDirectory; + +public class BloomFilterHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexHandler.class); + + private final File _indexDir; + private final SegmentDirectory.Writer _segmentWriter; + private final String _segmentName; + private final SegmentVersion _segmentVersion; + private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>(); + + public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata, @Nonnull IndexLoadingConfig indexLoadingConfig, + @Nonnull SegmentDirectory.Writer segmentWriter) { + _indexDir = indexDir; + _segmentWriter = segmentWriter; + _segmentName = segmentMetadata.getName(); + _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion()); + + // Do not create inverted index for sorted column + for (String column : indexLoadingConfig.getBloomFilterColumns()) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + if (columnMetadata != null) { + _bloomFilterColumns.add(columnMetadata); + } + } + } + + public void createBloomFilters() throws Exception { + for (ColumnMetadata columnMetadata : _bloomFilterColumns) { + if(columnMetadata.hasDictionary()) { + createBloomFilterForColumn(columnMetadata); + } + } + } + + private void createBloomFilterForColumn(ColumnMetadata columnMetadata) throws Exception { + String column = columnMetadata.getColumnName(); + + File inProgress = new File(_indexDir, column + ".bloom.inprogress"); + File bloomFilterFile = new File(_indexDir, column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + + if (_segmentWriter.hasIndexFor(column, ColumnIndexType.BLOOM_FILTER)) { + // Skip creating bloom filter index if already exists. + + LOGGER.info("Found bloom filter for segment: {}, column: {}", _segmentName, column); + return; + } + + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run gets interrupted. + + // Remove inverted index if exists. + // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index. + FileUtils.deleteQuietly(bloomFilterFile); + } + + // Create new bloom filter for the column. + LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, column); + int numDocs = columnMetadata.getTotalDocs(); + try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs, + columnMetadata.getTotalNumberOfEntries())) { + if (columnMetadata.hasDictionary()) { + //read dictionary + try (ImmutableDictionaryReader dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) { + for (int i = 0; i < dictionaryReader.length(); i++) { + creator.add(dictionaryReader.get(i)); + } + } + } else { + //read the forward index + throw new UnsupportedOperationException("Bloom filters not supported for No Dictionary columns"); + } + } + + // For v3, write the generated inverted index file into the single file and remove it. + if (_segmentVersion == SegmentVersion.v3) { + LoaderUtils.writeIndexToV3Format(_segmentWriter, column, bloomFilterFile, ColumnIndexType.BLOOM_FILTER); + } + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created bloom filter for segment: {}, column: {}", _segmentName, column); + } + + + + private ImmutableDictionaryReader getDictionaryReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter) throws IOException { + PinotDataBuffer dictionaryBuffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); + int cardinality = columnMetadata.getCardinality(); + ImmutableDictionaryReader dictionaryReader; + DataType dataType = columnMetadata.getDataType(); + switch (dataType) { + case INT: + dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality); + break; + case LONG: + dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality); + break; + case FLOAT: + dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality); + break; + case DOUBLE: + dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality); + break; + case STRING: + dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(), (byte) columnMetadata.getPaddingCharacter()); + break; + default: + throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName()); + } + return dictionaryReader; + } + +} diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java new file mode 100644 index 0000000..bab69f1 --- /dev/null +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/readers/BloomFilterReader.java @@ -0,0 +1,32 @@ +package com.linkedin.pinot.core.segment.index.readers; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; + +import com.clearspring.analytics.stream.membership.BloomFilter; +import com.linkedin.pinot.common.data.FieldSpec.DataType; +import com.linkedin.pinot.core.segment.memory.PinotDataBuffer; + +public class BloomFilterReader { + BloomFilter _bloomFilter; + + public BloomFilterReader(PinotDataBuffer bloomFilterBuffer, DataType dataType) throws IOException{ + byte[] buffer = new byte[(int)bloomFilterBuffer.size()]; + bloomFilterBuffer.copyTo(0, buffer); + _bloomFilter = BloomFilter.deserialize(buffer); + } + + public boolean mightContain(Object key){ + return _bloomFilter.isPresent(key.toString()); + } + + public static void main(String[] args) throws Exception { + FileInputStream fileInputStream = new FileInputStream(new File("/home/kgopalak/pinot_perf/index_dir/tpch_lineitem_OFFLINE/tpch_lineitem_0/l_orderkey.bloom.inv")); + byte[] buffer = IOUtils.toByteArray(fileInputStream); + BloomFilter bloomFilter = BloomFilter.deserialize(buffer); + System.out.println(bloomFilter.buckets()); + } +} diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java index 3579f1c..2973969 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexDirectory.java @@ -93,7 +93,14 @@ abstract class ColumnIndexDirectory implements Closeable { */ public abstract PinotDataBuffer getInvertedIndexBufferFor(String column) throws IOException; - + /** + * Get inverted bloom filter buffer for a column + * @param column column name + * @return in-memory ByteBuffer like buffer for data + * @throws IOException + */ + public abstract PinotDataBuffer getBloomFilterBufferFor(String column) + throws IOException; /** * Allocate a new data buffer of specified sizeBytes in the columnar index directory * @param column column name @@ -121,6 +128,15 @@ abstract class ColumnIndexDirectory implements Closeable { */ public abstract PinotDataBuffer newInvertedIndexBuffer(String column, long sizeBytes) throws IOException; + /** + * Allocate a new data buffer of specified sizeBytes in the columnar index directory + * @param column column name + * @param sizeBytes sizeBytes for the buffer allocation + * @return in-memory ByteBuffer like buffer for data + * @throws IOException + */ + public abstract PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes) + throws IOException; /** * Check if an index exists for a column diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java index 85d38f7..5f7e008 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/ColumnIndexType.java @@ -18,7 +18,8 @@ package com.linkedin.pinot.core.segment.store; public enum ColumnIndexType { DICTIONARY("dictionary"), FORWARD_INDEX("forward_index"), - INVERTED_INDEX("inverted_index"); + INVERTED_INDEX("inverted_index"), + BLOOM_FILTER("bloom_filter"); private final String indexName; ColumnIndexType(String name) { diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java index c6ea2d9..8599cde 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/FilePerIndexDirectory.java @@ -81,6 +81,20 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { } @Override + public PinotDataBuffer getBloomFilterBufferFor(String column) + throws IOException { + IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER); + return getReadBufferFor(key); + } + + @Override + public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes) + throws IOException { + IndexKey key = new IndexKey(column, ColumnIndexType.BLOOM_FILTER); + return getWriteBufferFor(key, sizeBytes); + } + + @Override public boolean hasIndexFor(String column, ColumnIndexType type) { File indexFile = getFileFor(column, type); return indexFile.exists(); @@ -140,6 +154,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { case INVERTED_INDEX: filename = metadata.getBitmapInvertedIndexFileName(column); break; + case BLOOM_FILTER: + filename = metadata.getBloomFilterFileName(column); + break; default: throw new UnsupportedOperationException("Unknown index type: " + indexType.toString()); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java index d29c37c..235f9cf 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SegmentLocalFSDirectory.java @@ -233,6 +233,10 @@ class SegmentLocalFSDirectory extends SegmentDirectory { case INVERTED_INDEX: buffer = columnIndexDirectory.getInvertedIndexBufferFor(column); break; + case BLOOM_FILTER: + buffer = columnIndexDirectory.getBloomFilterBufferFor(column); + break; + default: throw new RuntimeException("Unknown index type: " + type.name()); } @@ -412,6 +416,8 @@ class SegmentLocalFSDirectory extends SegmentDirectory { return columnIndexDirectory.newForwardIndexBuffer(key.name, sizeBytes); case INVERTED_INDEX: return columnIndexDirectory.newInvertedIndexBuffer(key.name, sizeBytes); + case BLOOM_FILTER: + return columnIndexDirectory.newBloomFilterBuffer(key.name, sizeBytes); default: throw new RuntimeException("Unknown index type: " + indexType.name() + " for directory: " + segmentDirectory); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java index cf22cc6..5271985 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/store/SingleFileIndexDirectory.java @@ -105,6 +105,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { } @Override + public PinotDataBuffer getBloomFilterBufferFor(String column) + throws IOException { + return checkAndGetIndexBuffer(column, ColumnIndexType.BLOOM_FILTER); + } + + @Override public boolean hasIndexFor(String column, ColumnIndexType type) { IndexKey key = new IndexKey(column, type); return columnEntries.containsKey(key); @@ -128,6 +134,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { return allocNewBufferInternal(column, ColumnIndexType.INVERTED_INDEX, sizeBytes, "inverted_index.create"); } + @Override + public PinotDataBuffer newBloomFilterBuffer(String column, long sizeBytes) + throws IOException { + return allocNewBufferInternal(column, ColumnIndexType.BLOOM_FILTER, sizeBytes, "bloom_filter.create"); + } + private PinotDataBuffer checkAndGetIndexBuffer(String column, ColumnIndexType type) { IndexKey key = new IndexKey(column, type); IndexEntry entry = columnEntries.get(key); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java index aedd6db..f3d1c52 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java @@ -18,6 +18,7 @@ package com.linkedin.pinot.core.segment.virtualcolumn; import com.linkedin.pinot.core.io.reader.DataFileReader; import com.linkedin.pinot.core.segment.index.column.ColumnIndexContainer; import com.linkedin.pinot.core.segment.index.column.PhysicalColumnIndexContainer; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; @@ -51,4 +52,9 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer { public Dictionary getDictionary() { return _dictionary; } + + @Override + public BloomFilterReader getBloomFilter() { + return null; + } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java index b450ff5..8959969 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeDimensionDataSource.java @@ -21,6 +21,7 @@ import com.linkedin.pinot.core.common.DataSource; import com.linkedin.pinot.core.common.DataSourceMetadata; import com.linkedin.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader; import com.linkedin.pinot.core.operator.blocks.SingleValueBlock; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; import com.linkedin.pinot.core.segment.memory.PinotDataBuffer; @@ -100,6 +101,11 @@ public class StarTreeDimensionDataSource extends DataSource { } @Override + public BloomFilterReader getBloomFilter() { + return null; + } + + @Override public Dictionary getDictionary() { return _dictionary; } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java index 31f742a..97d4ca1 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/startree/v2/store/StarTreeMetricDataSource.java @@ -24,6 +24,7 @@ import com.linkedin.pinot.core.io.reader.impl.v1.BaseChunkSingleValueReader; import com.linkedin.pinot.core.io.reader.impl.v1.FixedByteChunkSingleValueReader; import com.linkedin.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import com.linkedin.pinot.core.operator.blocks.SingleValueBlock; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.core.segment.index.readers.Dictionary; import com.linkedin.pinot.core.segment.index.readers.InvertedIndexReader; import com.linkedin.pinot.core.segment.memory.PinotDataBuffer; @@ -110,6 +111,11 @@ public class StarTreeMetricDataSource extends DataSource { } @Override + public BloomFilterReader getBloomFilter() { + return null; + } + + @Override protected Block getNextBlock() { return new SingleValueBlock(_forwardIndex, _numDocs, _dataType, null); } @@ -118,4 +124,6 @@ public class StarTreeMetricDataSource extends DataSource { public String getOperatorName() { return _operatorName; } + + } diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java index a278149..52b8542 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/common/RealtimeNoDictionaryTest.java @@ -86,10 +86,10 @@ public class RealtimeNoDictionaryTest { } Map<String, DataSource> dataSourceBlock = new HashMap<>(); - dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null)); - dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null)); - dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null)); - dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null)); + dataSourceBlock.put(INT_COL_NAME, new ColumnDataSource(intSpec, NUM_ROWS, 0, intRawIndex, null, null, null)); + dataSourceBlock.put(LONG_COL_NAME, new ColumnDataSource(longSpec, NUM_ROWS, 0, longRawIndex, null, null, null)); + dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, NUM_ROWS, 0, floatRawIndex, null, null, null)); + dataSourceBlock.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, doubleRawIndex, null, null, null)); return new DataFetcher(dataSourceBlock); } diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java new file mode 100644 index 0000000..70ea025 --- /dev/null +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/segment/index/creator/BloomFilterCreatorTest.java @@ -0,0 +1,71 @@ +package com.linkedin.pinot.core.segment.index.creator; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.Test; + +import com.clearspring.analytics.stream.membership.BloomFilter; +import com.google.common.hash.Funnels; + +public class BloomFilterCreatorTest { + + @Test + public void compareGauvaVsClearSpring() throws Exception { + int numElementsArray[] = new int[] { 10000, 100000, 1000000 }; + for (int numElements : numElementsArray) { + double maxFalsePosProbability = 0.1; +// int optimalNumOfBits =com.google.common.hash.BloomFilter.optimalNumOfBits + BloomFilter bloomFilter = new BloomFilter(numElements, maxFalsePosProbability); + com.google.common.hash.BloomFilter<Integer> gauvaIntegerBloomFilter = com.google.common.hash.BloomFilter.create(Funnels.integerFunnel(), numElements, + maxFalsePosProbability); + com.google.common.hash.BloomFilter<String> gauvaStringBloomFilter = com.google.common.hash.BloomFilter + .create(Funnels.stringFunnel(Charset.forName("UTF-8")), numElements, maxFalsePosProbability); + System.out.println("BitSet size:" + bloomFilter.buckets()); + Random random = new Random(); + Set<Integer> set = new HashSet<>(); + for (int i = 0; i < 100000; i++) { + int id = random.nextInt(500000000); + set.add(id); + bloomFilter.add(String.valueOf(id)); + gauvaIntegerBloomFilter.put(id); + gauvaStringBloomFilter.put(String.valueOf(id)); + } + byte[] buffer1 = BloomFilter.serialize(bloomFilter); + ByteArrayOutputStream intBuffer = new ByteArrayOutputStream(); + gauvaIntegerBloomFilter.writeTo(intBuffer); + ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream(); + gauvaIntegerBloomFilter.writeTo(stringBuffer); + //get the internal bitset + System.out.println(Arrays.toString(BloomFilter.class.getFields())); + Field field = BloomFilter.class.getDeclaredField("filter_"); + field.setAccessible(true); + BitSet bitset = (BitSet) field.get(bloomFilter); + MutableRoaringBitmap roaringBitmap = new MutableRoaringBitmap(); + for(long l:bitset.toLongArray()){ + roaringBitmap.add((int) l); + } + int roaringBitmapSize = roaringBitmap.serializedSizeInBytes(); + System.out.println("numBits:" + bitset.length()); + System.out.println("numBitsSet:" + bitset.cardinality()); + System.out.println("numElements:" + numElements); + System.out.println("clear spring size:" + buffer1.length); + System.out.println("Gauva int size:" + intBuffer.size()); + System.out.println("Gauva string size:" + stringBuffer.size()); + System.out.println("Roaring bitmap size:" + roaringBitmapSize); + System.out.println("\n"); + BitSet bitSet; + } + + } +} diff --git a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java index 22fc481..1e3fba1 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/query/pruner/ColumnValueSegmentPrunerTest.java @@ -21,6 +21,7 @@ import com.linkedin.pinot.common.utils.request.FilterQueryTree; import com.linkedin.pinot.common.utils.request.RequestUtils; import com.linkedin.pinot.core.query.pruner.ColumnValueSegmentPruner; import com.linkedin.pinot.core.segment.index.ColumnMetadata; +import com.linkedin.pinot.core.segment.index.readers.BloomFilterReader; import com.linkedin.pinot.pql.parsers.Pql2Compiler; import java.util.HashMap; import java.util.Map; @@ -35,6 +36,7 @@ import org.testng.annotations.Test; public class ColumnValueSegmentPrunerTest { private static final Pql2Compiler COMPILER = new Pql2Compiler(); private static final Map<String, ColumnMetadata> COLUMN_METADATA_MAP = new HashMap<>(); + private static final Map<String, BloomFilterReader> BLOOM_FILTER_MAP = new HashMap<>(); static { COLUMN_METADATA_MAP.put("time", new ColumnMetadata.Builder().setColumnName("time") @@ -95,6 +97,6 @@ public class ColumnValueSegmentPrunerTest { BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query); FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest); ColumnValueSegmentPruner pruner = new ColumnValueSegmentPruner(); - return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP); + return pruner.pruneSegment(filterQueryTree, COLUMN_METADATA_MAP, BLOOM_FILTER_MAP); } } diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java index 999f022..2368d8a 100644 --- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkDriver.java @@ -16,6 +16,7 @@ package com.linkedin.pinot.tools.perf; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter; import com.linkedin.pinot.common.config.TableConfig; import com.linkedin.pinot.common.config.TableNameBuilder; @@ -259,10 +260,10 @@ public class PerfBenchmarkDriver { public void configureTable(String tableName) throws Exception { - configureTable(tableName, null); + configureTable(tableName, null, null); } - public void configureTable(String tableName, List<String> invertedIndexColumns) + public void configureTable(String tableName, List<String> invertedIndexColumns, List<String> bloomFilterColumns) throws Exception { TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName) .setSegmentAssignmentStrategy(_segmentAssignmentStrategy) @@ -272,6 +273,7 @@ public class PerfBenchmarkDriver { .setLoadMode(_loadMode) .setSegmentVersion(_segmentFormatVersion) .setInvertedIndexColumns(invertedIndexColumns) + .setBloomFilterColumns(bloomFilterColumns) .build(); _helixResourceManager.addTable(tableConfig); } diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java index ac124fe..17fec66 100644 --- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java +++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/perf/PerfBenchmarkRunner.java @@ -72,6 +72,10 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command usage = "Comma separated inverted index columns to be created (non-batch load).") private String _invertedIndexColumns; + @Option(name = "-bloomFilterColumns", required = false, metaVar = "<String>", + usage = "Comma separated bloom filter columns to be created (non-batch load).") + private String _bloomFilterColumns; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -132,7 +136,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command @Override public void run() { try { - loadTable(driver, _dataDir, tableName, null); + loadTable(driver, _dataDir, tableName, null, null); } catch (Exception e) { LOGGER.error("Caught exception while loading table: {}", tableName, e); } @@ -146,14 +150,18 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command if (_invertedIndexColumns != null) { invertedIndexColumns = Arrays.asList(_invertedIndexColumns.split(",")); } + List<String> bloomFilterColumns = null; + if (_bloomFilterColumns != null) { + bloomFilterColumns = Arrays.asList(_bloomFilterColumns.split(",")); + } for (String tableName : _tableNames.split(",")) { - loadTable(driver, _dataDir, tableName, invertedIndexColumns); + loadTable(driver, _dataDir, tableName, invertedIndexColumns, bloomFilterColumns); } } } public static void loadTable(PerfBenchmarkDriver driver, String dataDir, String tableName, - List<String> invertedIndexColumns) + List<String> invertedIndexColumns, List<String> bloomFilterColumns) throws Exception { boolean tableConfigured = false; File[] segments = new File(dataDir, tableName).listFiles(); @@ -161,7 +169,7 @@ public class PerfBenchmarkRunner extends AbstractBaseCommand implements Command for (File segment : segments) { SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(segment); if (!tableConfigured) { - driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns); + driver.configureTable(segmentMetadata.getTableName(), invertedIndexColumns, bloomFilterColumns); tableConfigured = true; } driver.addSegment(segmentMetadata); diff --git a/pom.xml b/pom.xml index a482f7b..217eed0 100644 --- a/pom.xml +++ b/pom.xml @@ -874,6 +874,7 @@ <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> + <compilerVersion>${jdk.version}</compilerVersion> </configuration> </plugin> <plugin> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
