This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 02d1c128f9 CLP as a compressionCodec (#12504)
02d1c128f9 is described below
commit 02d1c128f9b97b12d3f4c7d10f432ffabcc286f1
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Mar 21 15:58:48 2024 +0530
CLP as a compressionCodec (#12504)
---------
Co-authored-by: Saurabh Dubey
<[email protected]>
Co-authored-by: Saurabh Dubey <[email protected]>
Co-authored-by: Seunghyun Lee <[email protected]>
---
.../tests/CLPEncodingRealtimeIntegrationTest.java | 150 ++++++++++++
.../src/test/resources/clpEncodingITData.tar.gz | Bin 0 -> 1863 bytes
...clpEncodingRealtimeIntegrationTestSchema.schema | 18 ++
pinot-segment-local/pom.xml | 4 +
.../local/io/util/VarLengthValueReader.java | 11 +
.../writer/impl/FixedBitMVForwardIndexWriter.java | 2 +-
.../writer/impl/FixedBitSVForwardIndexWriter.java | 2 +-
.../stats/MutableNoDictionaryColStatistics.java | 13 +-
.../impl/forward/CLPMutableForwardIndex.java | 178 ++++++++++++++
.../creator/impl/SegmentDictionaryCreator.java | 9 +
.../creator/impl/fwd/CLPForwardIndexCreatorV1.java | 272 +++++++++++++++++++++
.../fwd/MultiValueFixedByteRawIndexCreator.java | 15 +-
.../stats/AbstractColumnStatisticsCollector.java | 3 +
.../creator/impl/stats/CLPStatsProvider.java | 66 +++++
.../stats/StringColumnPreIndexStatsCollector.java | 97 +++++++-
.../index/forward/ForwardIndexCreatorFactory.java | 5 +
.../index/forward/ForwardIndexReaderFactory.java | 10 +
.../segment/index/forward/ForwardIndexType.java | 6 +-
.../readers/forward/CLPForwardIndexReaderV1.java | 219 +++++++++++++++++
.../segment/local/utils/TableConfigUtils.java | 10 +-
.../local/segment/creator/DictionariesTest.java | 51 +++-
.../index/creator/CLPForwardIndexCreatorTest.java | 117 +++++++++
.../mutable/CLPMutableForwardIndexTest.java | 104 ++++++++
.../spi/creator/ColumnIndexCreationInfo.java | 4 +
.../segment/spi/creator/IndexCreationContext.java | 24 +-
.../spi/creator/SegmentGeneratorConfig.java | 3 +-
.../segment/spi/creator/StatsCollectorConfig.java | 14 ++
.../segment/spi/index/ForwardIndexConfig.java | 1 +
.../apache/pinot/spi/config/table/FieldConfig.java | 3 +
29 files changed, 1388 insertions(+), 23 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
new file mode 100644
index 0000000000..c8b5d88646
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTestSet {
+ private List<File> _avroFiles;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+ _avroFiles = unpackAvroData(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ // Start a customized controller with more frequent realtime segment
validation
+ startController();
+ startBroker();
+ startServers(1);
+
+ startKafka();
+ pushAvroIntoKafka(_avroFiles);
+
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
+ addTableConfig(tableConfig);
+
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getInvertedIndexColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getRangeIndexColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected List<String> getBloomFilterColumns() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected String getSortedColumn() {
+ return null;
+ }
+
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ return Collections.singletonList("logLine");
+ }
+
+ @Test
+ public void testValues()
+ throws Exception {
+ Assert.assertEquals(getPinotConnection().execute(
+ "SELECT count(*) FROM " + getTableName() + " WHERE
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
+ .getLong(0), 53);
+ }
+
+ protected int getRealtimeSegmentFlushSize() {
+ return 30;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return 100;
+ }
+
+ @Override
+ protected String getTableName() {
+ return "clpEncodingIT";
+ }
+
+ @Override
+ protected String getAvroTarFileName() {
+ return "clpEncodingITData.tar.gz";
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "clpEncodingRealtimeIntegrationTestSchema.schema";
+ }
+
+ @Override
+ protected String getTimeColumnName() {
+ return "timestampInEpoch";
+ }
+
+ @Override
+ protected List<FieldConfig> getFieldConfigs() {
+ List<FieldConfig> fieldConfigs = new ArrayList<>();
+ fieldConfigs.add(
+ new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null,
FieldConfig.CompressionCodec.CLP, null,
+ null, null, null));
+
+ return fieldConfigs;
+ }
+
+ @Override
+ protected IngestionConfig getIngestionConfig() {
+ List<TransformConfig> transforms = new ArrayList<>();
+ transforms.add(new TransformConfig("timestampInEpoch", "now()"));
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transforms);
+
+ return ingestionConfig;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz
b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz
new file mode 100644
index 0000000000..c2f5baaa5f
Binary files /dev/null and
b/pinot-integration-tests/src/test/resources/clpEncodingITData.tar.gz differ
diff --git
a/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
new file mode 100644
index 0000000000..9aa38a1a19
--- /dev/null
+++
b/pinot-integration-tests/src/test/resources/clpEncodingRealtimeIntegrationTestSchema.schema
@@ -0,0 +1,18 @@
+{
+ "schemaName": "clpEncodingIT",
+ "dimensionFieldSpecs": [
+ {
+ "name": "logLine",
+ "dataType": "STRING"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "notNull": false,
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml
index 27a7dcd8c6..ac06098c24 100644
--- a/pinot-segment-local/pom.xml
+++ b/pinot-segment-local/pom.xml
@@ -151,5 +151,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.yscope.clp</groupId>
+ <artifactId>clp-ffi</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
index cd1346896a..d9384bf16f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.segment.local.io.util;
+import java.util.List;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -101,6 +103,15 @@ public class VarLengthValueReader implements ValueReader {
return new String(buffer, 0, length, UTF_8);
}
+ public void recordOffsetRanges(int index, long baseOffset,
List<ForwardIndexReader.ByteRange> rangeList) {
+ int offsetPosition = _dataSectionStartOffSet + Integer.BYTES * index;
+ int startOffset = _dataBuffer.getInt(offsetPosition);
+ int endOffset = _dataBuffer.getInt(offsetPosition + Integer.BYTES);
+ rangeList.add(new ForwardIndexReader.ByteRange(offsetPosition +
baseOffset, 2 * Integer.BYTES));
+ int length = endOffset - startOffset;
+ rangeList.add(new ForwardIndexReader.ByteRange(startOffset + baseOffset,
length));
+ }
+
@Override
public String getPaddedString(int index, int numBytesPerValue, byte[]
buffer) {
throw new UnsupportedOperationException();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
index 650a75e28f..fd46a843ee 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVForwardIndexWriter.java
@@ -75,7 +75,7 @@ public class FixedBitMVForwardIndexWriter implements
Closeable {
private int _nextDocId = 0;
public FixedBitMVForwardIndexWriter(File file, int numDocs, int
totalNumValues, int numBitsPerValue)
- throws Exception {
+ throws IOException {
float averageValuesPerDoc = totalNumValues / numDocs;
_docsPerChunk = (int) (Math.ceil(PREFERRED_NUM_VALUES_PER_CHUNK /
averageValuesPerDoc));
_numChunks = (numDocs + _docsPerChunk - 1) / _docsPerChunk;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
index b7c8c55c5b..f6712b38bf 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitSVForwardIndexWriter.java
@@ -37,7 +37,7 @@ public class FixedBitSVForwardIndexWriter implements
Closeable {
private int _nextDocId = 0;
public FixedBitSVForwardIndexWriter(File file, int numDocs, int
numBitsPerValue)
- throws Exception {
+ throws IOException {
// Convert to long in order to avoid int overflow
long length = ((long) numDocs * numBitsPerValue + Byte.SIZE - 1) /
Byte.SIZE;
// Backward-compatible: index file is always big-endian
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 0cf4c37b96..5f2d98893b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -21,6 +21,8 @@ package
org.apache.pinot.segment.local.realtime.converter.stats;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Set;
+import
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
@@ -30,7 +32,7 @@ import
org.apache.pinot.segment.spi.partition.PartitionFunction;
import static org.apache.pinot.segment.spi.Constants.UNKNOWN_CARDINALITY;
-public class MutableNoDictionaryColStatistics implements ColumnStatistics {
+public class MutableNoDictionaryColStatistics implements ColumnStatistics,
CLPStatsProvider {
private final DataSourceMetadata _dataSourceMetadata;
private final MutableForwardIndex _forwardIndex;
@@ -111,4 +113,13 @@ public class MutableNoDictionaryColStatistics implements
ColumnStatistics {
public Set<Integer> getPartitions() {
return _dataSourceMetadata.getPartitions();
}
+
+ @Override
+ public CLPStats getCLPStats() {
+ if (_forwardIndex instanceof CLPMutableForwardIndex) {
+ return ((CLPMutableForwardIndex) _forwardIndex).getCLPStats();
+ }
+ throw new IllegalStateException(
+ "CLP stats not available for column: " +
_dataSourceMetadata.getFieldSpec().getName());
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
new file mode 100644
index 0000000000..bed7a4e9be
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/CLPMutableForwardIndex.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import
org.apache.pinot.segment.local.realtime.impl.dictionary.StringOffHeapMutableDictionary;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
+import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
+import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class CLPMutableForwardIndex implements MutableForwardIndex {
+ // TODO: We can get better dynamic estimates using segment stats
+ private static final int ESTIMATED_LOG_TYPE_CARDINALITY = 10000;
+ private static final int ESTIMATED_DICT_VARS_CARDINALITY = 10000;
+ private static final int ESTIMATED_LOG_TYPE_LENGTH = 200;
+ private static final int ESTIMATED_DICT_VARS_LENGTH = 50;
+ private FieldSpec.DataType _storedType;
+ private final EncodedMessage _clpEncodedMessage;
+ private final MessageEncoder _clpMessageEncoder;
+ private final MessageDecoder _clpMessageDecoder;
+ private final MutableDictionary _logTypeDictCreator;
+ private final MutableDictionary _dictVarsDictCreator;
+ private final FixedByteSVMutableForwardIndex _logTypeFwdIndex;
+ private final FixedByteMVMutableForwardIndex _dictVarsFwdIndex;
+ private final FixedByteMVMutableForwardIndex _encodedVarsFwdIndex;
+
+ // clp stats
+ int _totalNumberOfDictVars = 0;
+ int _maxNumberOfEncodedVars = 0;
+ int _totalNumberOfEncodedVars = 0;
+ private int _lengthOfShortestElement;
+ private int _lengthOfLongestElement;
+
+ public CLPMutableForwardIndex(String columnName, FieldSpec.DataType
storedType,
+ PinotDataBufferMemoryManager memoryManager, int capacity) {
+ _clpEncodedMessage = new EncodedMessage();
+ _clpMessageEncoder = new
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ _logTypeDictCreator =
+ new StringOffHeapMutableDictionary(ESTIMATED_LOG_TYPE_CARDINALITY,
ESTIMATED_LOG_TYPE_CARDINALITY / 10,
+ memoryManager, columnName + "_logType.dict",
ESTIMATED_LOG_TYPE_LENGTH);
+ _dictVarsDictCreator =
+ new StringOffHeapMutableDictionary(ESTIMATED_DICT_VARS_CARDINALITY,
ESTIMATED_DICT_VARS_CARDINALITY / 10,
+ memoryManager, columnName + "_dictVars.dict",
ESTIMATED_DICT_VARS_LENGTH);
+
+ _logTypeFwdIndex = new FixedByteSVMutableForwardIndex(true,
FieldSpec.DataType.INT, capacity, memoryManager,
+ columnName + "_logType.fwd");
+ _dictVarsFwdIndex =
+ new
FixedByteMVMutableForwardIndex(ForwardIndexType.MAX_MULTI_VALUES_PER_ROW, 20,
capacity, Integer.BYTES,
+ memoryManager, columnName + "_dictVars.fwd", true,
FieldSpec.DataType.INT);
+ _encodedVarsFwdIndex =
+ new
FixedByteMVMutableForwardIndex(ForwardIndexType.MAX_MULTI_VALUES_PER_ROW, 20,
capacity, Long.BYTES,
+ memoryManager, columnName + "_encodedVars.fwd", true,
FieldSpec.DataType.LONG);
+ _clpMessageDecoder = new
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ _storedType = storedType;
+ }
+
+ @Override
+ public int getLengthOfShortestElement() {
+ return _lengthOfShortestElement;
+ }
+
+ @Override
+ public int getLengthOfLongestElement() {
+ return _lengthOfLongestElement;
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public FieldSpec.DataType getStoredType() {
+ return _storedType;
+ }
+
+ @Override
+ public void setString(int docId, String value) {
+ String logtype;
+ String[] dictVars;
+ Long[] encodedVars;
+
+ _lengthOfLongestElement = Math.max(_lengthOfLongestElement,
value.length());
+ _lengthOfShortestElement = Math.min(_lengthOfShortestElement,
value.length());
+
+ try {
+ _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+ logtype = _clpEncodedMessage.getLogTypeAsString();
+ dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+ encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to encode message: " + value,
e);
+ }
+
+ _totalNumberOfDictVars += dictVars.length;
+ _totalNumberOfEncodedVars += encodedVars.length;
+ _maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars,
encodedVars.length);
+
+ int logTypeDictId = _logTypeDictCreator.index(logtype);
+ _logTypeFwdIndex.setDictId(docId, logTypeDictId);
+
+ int[] dictVarsDictIds = new int[dictVars.length];
+ for (int i = 0; i < dictVars.length; i++) {
+ dictVarsDictIds[i] = _dictVarsDictCreator.index(dictVars[i]);
+ }
+ _dictVarsFwdIndex.setDictIdMV(docId, dictVarsDictIds);
+
+ long[] encodedVarsLongs = new long[encodedVars.length];
+ for (int i = 0; i < encodedVars.length; i++) {
+ encodedVarsLongs[i] = encodedVars[i];
+ }
+ _encodedVarsFwdIndex.setLongMV(docId, encodedVarsLongs);
+ }
+
+ @Override
+ public String getString(int docId) {
+ String logType =
_logTypeDictCreator.getStringValue(_logTypeFwdIndex.getDictId(docId));
+ int[] dictVarsDictIds = _dictVarsFwdIndex.getDictIdMV(docId);
+ String[] dictVars = new String[dictVarsDictIds.length];
+ for (int i = 0; i < dictVarsDictIds.length; i++) {
+ dictVars[i] = _dictVarsDictCreator.getStringValue(dictVarsDictIds[i]);
+ }
+ long[] encodedVarsLongs = _encodedVarsFwdIndex.getLongMV(docId);
+ try {
+ return _clpMessageDecoder.decodeMessage(logType, dictVars,
encodedVarsLongs);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to encode message: " +
logType, e);
+ }
+ }
+
+ public StringColumnPreIndexStatsCollector.CLPStats getCLPStats() {
+ return new CLPStatsProvider.CLPStats((String[])
_logTypeDictCreator.getSortedValues(),
+ (String[]) _dictVarsDictCreator.getSortedValues(),
_totalNumberOfDictVars, _totalNumberOfEncodedVars,
+ _maxNumberOfEncodedVars);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _logTypeDictCreator.close();
+ _dictVarsDictCreator.close();
+ _logTypeFwdIndex.close();
+ _dictVarsFwdIndex.close();
+ _encodedVarsFwdIndex.close();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
index 96fc96cee4..1dfb65adea 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
@@ -70,12 +70,21 @@ public class SegmentDictionaryCreator implements
IndexCreator {
private Object2IntOpenHashMap<Object> _objectValueToIndexMap;
private int _numBytesPerEntry = 0;
+ public SegmentDictionaryCreator(String columnName, DataType storedType, File
indexFile,
+ boolean useVarLengthDictionary) {
+ _columnName = columnName;
+ _storedType = storedType;
+ _dictionaryFile = indexFile;
+ _useVarLengthDictionary = useVarLengthDictionary;
+ }
+
public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir, boolean
useVarLengthDictionary) {
_columnName = fieldSpec.getName();
_storedType = fieldSpec.getDataType().getStoredType();
_dictionaryFile = new File(indexDir, _columnName +
DictionaryIndexType.getFileExtension());
_useVarLengthDictionary = useVarLengthDictionary;
}
+
@Override
public void add(@Nonnull Object value, int dictId)
throws IOException {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
new file mode 100644
index 0000000000..c681265ffb
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Writer for CLP forward index.
+ * <p>CLP forward index contains 3 parts:
+ * <ul>
+ * <li>Header bytes: MAGIC_BYTES, version, </li>
+ * <li>LogType dictionary: dictionary for logType column</li>
+ * <li>DictVars dictionary: dictionary for dictVars column</li>
+ * <li>LogType fwd index: fwd index for logType column</li>
+ * <li>DictVars fwd index: fwd index for dictVars column</li>
+ * <li>EncodedVars fwd index: raw fwd index for encodedVars column</li>
+ * </ul>
+ */
+
+public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
+ public static final byte[] MAGIC_BYTES =
"CLP.v1".getBytes(StandardCharsets.UTF_8);
+ private final String _column;
+ private final int _numDocs;
+ private final File _intermediateFilesDir;
+ private final FileChannel _dataFile;
+ private final ByteBuffer _fileBuffer;
+ private final EncodedMessage _clpEncodedMessage;
+ private final MessageEncoder _clpMessageEncoder;
+ private final StringColumnPreIndexStatsCollector.CLPStats _clpStats;
+ private final SegmentDictionaryCreator _logTypeDictCreator;
+ private final SegmentDictionaryCreator _dictVarsDictCreator;
+ private final FixedBitSVForwardIndexWriter _logTypeFwdIndexWriter;
+ private final FixedBitMVForwardIndexWriter _dictVarsFwdIndexWriter;
+ private final MultiValueFixedByteRawIndexCreator _encodedVarsFwdIndexWriter;
+ private final File _logTypeDictFile;
+ private final File _dictVarsDictFile;
+ private final File _logTypeFwdIndexFile;
+ private final File _dictVarsFwdIndexFile;
+ private final File _encodedVarsFwdIndexFile;
+
+ public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int
numDocs, ColumnStatistics columnStatistics)
+ throws IOException {
+ _column = column;
+ _numDocs = numDocs;
+ _intermediateFilesDir =
+ new File(baseIndexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+ if (_intermediateFilesDir.exists()) {
+ FileUtils.cleanDirectory(_intermediateFilesDir);
+ } else {
+ FileUtils.forceMkdir(_intermediateFilesDir);
+ }
+
+ _dataFile =
+ new RandomAccessFile(new File(baseIndexDir, column +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+ "rw").getChannel();
+ _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0,
Integer.MAX_VALUE);
+
+ CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
+ _clpStats = statsCollector.getCLPStats();
+ _logTypeDictFile = new File(_intermediateFilesDir, _column +
"_clp_logtype.dict");
+ _logTypeDictCreator =
+ new SegmentDictionaryCreator(_column + "_clp_logtype.dict",
FieldSpec.DataType.STRING, _logTypeDictFile, true);
+ _logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());
+
+ _dictVarsDictFile = new File(_intermediateFilesDir, _column +
"_clp_dictvars.dict");
+ _dictVarsDictCreator =
+ new SegmentDictionaryCreator(_column + "_clp_dictvars.dict",
FieldSpec.DataType.STRING, _dictVarsDictFile,
+ true);
+ _dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());
+
+ _logTypeFwdIndexFile = new File(_intermediateFilesDir, column +
"_clp_logtype.fwd");
+ _logTypeFwdIndexWriter = new
FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
+
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length -
1));
+
+ _dictVarsFwdIndexFile = new File(_intermediateFilesDir, column +
"_clp_dictvars.fwd");
+ _dictVarsFwdIndexWriter =
+ new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs,
_clpStats.getTotalNumberOfDictVars(),
+
PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length -
1));
+
+ _encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column +
"_clp_encodedvars.fwd");
+ _encodedVarsFwdIndexWriter =
+ new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile,
ChunkCompressionType.LZ4, numDocs,
+ FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(),
false,
+ VarByteChunkForwardIndexWriterV4.VERSION);
+ _clpStats.clear();
+
+ _clpEncodedMessage = new EncodedMessage();
+ _clpMessageEncoder = new
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public FieldSpec.DataType getValueType() {
+ return FieldSpec.DataType.STRING;
+ }
+
+ @Override
+ public void putBigDecimal(BigDecimal value) {
+ throw new UnsupportedOperationException("Non string types are not
supported");
+ }
+
+ @Override
+ public void putString(String value) {
+ String logtype;
+ String[] dictVars;
+ Long[] encodedVars;
+
+ try {
+ _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+ logtype = _clpEncodedMessage.getLogTypeAsString();
+ dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+ encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to encode message: " + value,
e);
+ }
+
+ if (logtype == null) {
+ logtype = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+ }
+
+ if (dictVars == null) {
+ dictVars = new
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+ }
+
+ if (encodedVars == null) {
+ encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+ }
+
+ addCLPFields(logtype, dictVars, encodedVars);
+ }
+
+ private void addCLPFields(String logtype, String[] dictVars, Long[]
encodedVars) {
+ int logTypeDictId = _logTypeDictCreator.indexOfSV(logtype);
+ int[] dictVarDictIds = _dictVarsDictCreator.indexOfMV(dictVars);
+
+ _logTypeFwdIndexWriter.putDictId(logTypeDictId);
+ _dictVarsFwdIndexWriter.putDictIds(dictVarDictIds);
+
+ long[] encodedVarsUnboxed = new long[encodedVars.length];
+ for (int i = 0; i < encodedVars.length; i++) {
+ encodedVarsUnboxed[i] = encodedVars[i];
+ }
+ _encodedVarsFwdIndexWriter.putLongMV(encodedVarsUnboxed);
+ }
+
+ @Override
+ public void seal()
+ throws IOException {
+ // Append all of these into fileBuffer
+ _logTypeDictCreator.seal();
+ _logTypeDictCreator.close();
+
+ _dictVarsDictCreator.seal();
+ _dictVarsDictCreator.close();
+
+ _logTypeFwdIndexWriter.close();
+ _dictVarsFwdIndexWriter.close();
+ _encodedVarsFwdIndexWriter.close();
+
+ long totalSize = 0;
+ _fileBuffer.put(MAGIC_BYTES);
+ totalSize += MAGIC_BYTES.length;
+
+ _fileBuffer.putInt(1); // version
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt(_clpStats.getTotalNumberOfDictVars());
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt(_logTypeDictCreator.getNumBytesPerEntry());
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt(_dictVarsDictCreator.getNumBytesPerEntry());
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt((int) _logTypeDictFile.length()); // logType dict length
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt((int) _dictVarsDictFile.length()); // dictVars dict
length
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt((int) _logTypeFwdIndexFile.length()); // logType fwd
index length
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt((int) _dictVarsFwdIndexFile.length()); // dictVars fwd
index length
+ totalSize += Integer.BYTES;
+
+ _fileBuffer.putInt((int) _encodedVarsFwdIndexFile.length()); //
encodedVars fwd index length
+ totalSize += Integer.BYTES;
+
+ copyFileIntoBuffer(_logTypeDictFile);
+ totalSize += _logTypeDictFile.length();
+
+ copyFileIntoBuffer(_dictVarsDictFile);
+ totalSize += _dictVarsDictFile.length();
+
+ copyFileIntoBuffer(_logTypeFwdIndexFile);
+ totalSize += _logTypeFwdIndexFile.length();
+
+ copyFileIntoBuffer(_dictVarsFwdIndexFile);
+ totalSize += _dictVarsFwdIndexFile.length();
+
+ copyFileIntoBuffer(_encodedVarsFwdIndexFile);
+ totalSize += _encodedVarsFwdIndexFile.length();
+
+ _dataFile.truncate(totalSize);
+ }
+
+ private void copyFileIntoBuffer(File file) throws IOException {
+ try (FileChannel from = (FileChannel.open(file.toPath(),
StandardOpenOption.READ))) {
+ _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0,
file.length()));
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ // Delete all temp files
+ _dataFile.close();
+ FileUtils.deleteDirectory(_intermediateFilesDir);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index a1ba59f3b2..d459db1f6f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -73,15 +73,22 @@ public class MultiValueFixedByteRawIndexCreator implements
ForwardIndexCreator {
int totalDocs, DataType valueType, int maxNumberOfMultiValueElements,
boolean deriveNumDocsPerChunk,
int writerVersion)
throws IOException {
- File file = new File(baseIndexDir, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ this(new File(baseIndexDir, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION), compressionType, totalDocs,
+ valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion);
+ }
+
+ public MultiValueFixedByteRawIndexCreator(File indexFile,
ChunkCompressionType compressionType, int totalDocs,
+ DataType valueType, int maxNumberOfMultiValueElements, boolean
deriveNumDocsPerChunk, int writerVersion)
+ throws IOException {
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements *
valueType.getStoredType().size());
int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(
TARGET_MAX_CHUNK_SIZE / (totalMaxLength +
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1)
: DEFAULT_NUM_DOCS_PER_CHUNK;
- _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ?
new VarByteChunkForwardIndexWriter(file,
- compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
writerVersion)
- : new VarByteChunkForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
+ _indexWriter =
+ writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new
VarByteChunkForwardIndexWriter(indexFile,
+ compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
writerVersion)
+ : new VarByteChunkForwardIndexWriterV4(indexFile, compressionType,
TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 5af1e14999..41704b9bd4 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -27,6 +27,7 @@ import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -44,6 +45,7 @@ import org.apache.pinot.spi.data.FieldSpec;
public abstract class AbstractColumnStatisticsCollector implements
ColumnStatistics {
protected static final int INITIAL_HASH_SET_SIZE = 1000;
protected final FieldSpec _fieldSpec;
+ protected final FieldConfig _fieldConfig;
private final Map<String, String> _partitionFunctionConfig;
private final PartitionFunction _partitionFunction;
@@ -57,6 +59,7 @@ public abstract class AbstractColumnStatisticsCollector
implements ColumnStatist
public AbstractColumnStatisticsCollector(String column, StatsCollectorConfig
statsCollectorConfig) {
_fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column);
+ _fieldConfig = statsCollectorConfig.getFieldConfigForColumn(column);
Preconditions.checkArgument(_fieldSpec != null, "Failed to find column:
%s", column);
if (!_fieldSpec.isSingleValueField()) {
_sorted = false;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
new file mode 100644
index 0000000000..b861188620
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/CLPStatsProvider.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+public interface CLPStatsProvider {
+
+ CLPStats getCLPStats();
+
+ class CLPStats {
+ int _totalNumberOfDictVars = 0;
+ int _totalNumberOfEncodedVars = 0;
+ int _maxNumberOfEncodedVars = 0;
+ private String[] _sortedLogTypeValues;
+ private String[] _sortedDictVarValues;
+
+ public CLPStats(String[] sortedLogTypeValues, String[]
sortedDictVarValues, int totalNumberOfDictVars,
+ int totalNumberOfEncodedVars, int maxNumberOfEncodedVars) {
+ _sortedLogTypeValues = sortedLogTypeValues;
+ _sortedDictVarValues = sortedDictVarValues;
+ _totalNumberOfDictVars = totalNumberOfDictVars;
+ _totalNumberOfEncodedVars = totalNumberOfEncodedVars;
+ _maxNumberOfEncodedVars = maxNumberOfEncodedVars;
+ }
+
+ public void clear() {
+ _sortedLogTypeValues = null;
+ _sortedDictVarValues = null;
+ }
+
+ public int getMaxNumberOfEncodedVars() {
+ return _maxNumberOfEncodedVars;
+ }
+
+ public int getTotalNumberOfDictVars() {
+ return _totalNumberOfDictVars;
+ }
+
+ public int getTotalNumberOfEncodedVars() {
+ return _totalNumberOfEncodedVars;
+ }
+
+ public String[] getSortedLogTypeValues() {
+ return _sortedLogTypeValues;
+ }
+
+ public String[] getSortedDictVarValues() {
+ return _sortedDictVarValues;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index b896ac30ff..d606c784d2 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -18,24 +18,35 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;
+import com.google.common.annotations.VisibleForTesting;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import static java.nio.charset.StandardCharsets.UTF_8;
-public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCollector {
+public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatisticsCollector implements CLPStatsProvider {
private Set<String> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
private int _maxRowLength = 0;
private String[] _sortedValues;
private boolean _sealed = false;
+ private CLPStatsCollector _clpStatsCollector;
public StringColumnPreIndexStatsCollector(String column,
StatsCollectorConfig statsCollectorConfig) {
super(column, statsCollectorConfig);
+ if (_fieldConfig != null && _fieldConfig.getCompressionCodec() ==
FieldConfig.CompressionCodec.CLP) {
+ _clpStatsCollector = new CLPStatsCollector();
+ }
}
@Override
@@ -48,6 +59,9 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
for (Object obj : values) {
String value = (String) obj;
_values.add(value);
+ if (_clpStatsCollector != null) {
+ _clpStatsCollector.collect(value);
+ }
int length = value.getBytes(UTF_8).length;
_minLength = Math.min(_minLength, length);
@@ -61,6 +75,9 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
} else {
String value = (String) entry;
addressSorted(value);
+ if (_clpStatsCollector != null) {
+ _clpStatsCollector.collect(value);
+ }
if (_values.add(value)) {
if (isPartitionEnabled()) {
updatePartition(value);
@@ -74,6 +91,14 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
}
}
+ @Override
+ public CLPStats getCLPStats() {
+ if (_sealed) {
+ return _clpStatsCollector.getCLPStats();
+ }
+ throw new IllegalStateException("The collector must be sealed before
calling getCLPStats");
+ }
+
@Override
public String getMinValue() {
if (_sealed) {
@@ -125,7 +150,77 @@ public class StringColumnPreIndexStatsCollector extends
AbstractColumnStatistics
_sortedValues = _values.toArray(new String[0]);
_values = null;
Arrays.sort(_sortedValues);
+ if (_clpStatsCollector != null) {
+ _clpStatsCollector.seal();
+ }
_sealed = true;
}
}
+
+ @VisibleForTesting
+ public static class CLPStatsCollector {
+ private final EncodedMessage _clpEncodedMessage;
+ private final MessageEncoder _clpMessageEncoder;
+ int _totalNumberOfDictVars = 0;
+ int _totalNumberOfEncodedVars = 0;
+ int _maxNumberOfEncodedVars = 0;
+ private Set<String> _logTypes = new
ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
+ private Set<String> _dictVars = new
ObjectOpenHashSet<>(AbstractColumnStatisticsCollector.INITIAL_HASH_SET_SIZE);
+ private CLPStats _clpStats;
+
+ public CLPStatsCollector() {
+ _clpEncodedMessage = new EncodedMessage();
+ _clpMessageEncoder = new
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ }
+
+ public void collect(String value) {
+ String logType;
+ String[] dictVars;
+ Long[] encodedVars;
+
+ try {
+ _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+ logType = _clpEncodedMessage.getLogTypeAsString();
+ dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+ encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to encode message: " +
value, e);
+ }
+
+ if (logType == null) {
+ logType = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+ }
+
+ if (dictVars == null) {
+ dictVars = new
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+ }
+
+ if (encodedVars == null) {
+ encodedVars = new
Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+ }
+
+ _logTypes.add(logType);
+ _dictVars.addAll(Arrays.asList(dictVars));
+ _totalNumberOfDictVars += dictVars.length;
+ _totalNumberOfEncodedVars += encodedVars.length;
+ _maxNumberOfEncodedVars = Math.max(_maxNumberOfEncodedVars,
encodedVars.length);
+ }
+
+ public void seal() {
+ String[] sortedLogTypeValues = _logTypes.toArray(new String[0]);
+ _logTypes = null;
+ Arrays.sort(sortedLogTypeValues);
+ String[] sortedDictVarValues = _dictVars.toArray(new String[0]);
+ _dictVars = null;
+ Arrays.sort(sortedDictVarValues);
+ _clpStats =
+ new CLPStats(sortedLogTypeValues, sortedDictVarValues,
_totalNumberOfDictVars, _totalNumberOfEncodedVars,
+ _maxNumberOfEncodedVars);
+ }
+
+ public CLPStats getCLPStats() {
+ return _clpStats;
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index ded5000b0c..cd0417a258 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
import java.io.File;
import java.io.IOException;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
@@ -34,6 +35,7 @@ import
org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -69,6 +71,9 @@ public class ForwardIndexCreatorFactory {
} else {
// Dictionary disabled columns
DataType storedType = fieldSpec.getDataType().getStoredType();
+ if (indexConfig.getCompressionCodec() ==
FieldConfig.CompressionCodec.CLP) {
+ return new CLPForwardIndexCreatorV1(indexDir, columnName,
numTotalDocs, context.getColumnStatistics());
+ }
ChunkCompressionType chunkCompressionType =
indexConfig.getChunkCompressionType();
if (chunkCompressionType == null) {
chunkCompressionType =
ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index b5ca2b83c1..86a1038dad 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -19,7 +19,10 @@
package org.apache.pinot.segment.local.segment.index.forward;
+import org.apache.commons.lang.ArrayUtils;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
@@ -83,6 +86,13 @@ public class ForwardIndexReaderFactory extends
IndexReaderFactory.Default<Forwar
}
}
} else {
+ if (dataBuffer.size() >= CLPForwardIndexCreatorV1.MAGIC_BYTES.length) {
+ byte[] magicBytes = new
byte[CLPForwardIndexCreatorV1.MAGIC_BYTES.length];
+ dataBuffer.copyTo(0, magicBytes);
+ if (ArrayUtils.isEquals(magicBytes,
CLPForwardIndexCreatorV1.MAGIC_BYTES)) {
+ return new CLPForwardIndexReaderV1(dataBuffer,
metadata.getTotalDocs());
+ }
+ }
return createRawIndexReader(dataBuffer,
metadata.getDataType().getStoredType(), metadata.isSingleValue());
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index a454bf9cfb..4172a6bdbb 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
import
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex;
@@ -68,7 +69,7 @@ public class ForwardIndexType extends
AbstractIndexType<ForwardIndexConfig, Forw
public static final String INDEX_DISPLAY_NAME = "forward";
// For multi-valued column, forward-index.
// Maximum number of multi-values per row. We assert on this.
- private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+ public static final int MAX_MULTI_VALUES_PER_ROW = 1000;
private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
private static final List<String> EXTENSIONS = Lists.newArrayList(
@@ -283,6 +284,9 @@ public class ForwardIndexType extends
AbstractIndexType<ForwardIndexConfig, Forw
// Use a smaller capacity as opposed to segment flush size
int initialCapacity = Math.min(context.getCapacity(),
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+ if (config.getCompressionCodec() == CompressionCodec.CLP) {
+ return new CLPMutableForwardIndex(column, storedType,
context.getMemoryManager(), context.getCapacity());
+ }
return new VarByteSVMutableForwardIndex(storedType,
context.getMemoryManager(), allocationContext,
initialCapacity,
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
new file mode 100644
index 0000000000..d68a80ae20
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/CLPForwardIndexReaderV1.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+
+public class CLPForwardIndexReaderV1 implements
ForwardIndexReader<CLPForwardIndexReaderV1.CLPReaderContext> {
+ private final int _version;
+ private final int _numDocs;
+ private final int _totalDictVarValues;
+ private final int _logTypeDictNumBytesPerValue;
+ private final int _dictVarsDictNumBytesPerValue;
+ private final int _logTypeDictReaderStartOffset;
+ private final VarLengthValueReader _logTypeDictReader;
+ private final int _dictVarsDictReaderStartOffset;
+ private final VarLengthValueReader _dictVarsDictReader;
+ private final int _logTypeFwdIndexReaderStartOffset;
+ private final FixedBitSVForwardIndexReader _logTypeFwdIndexReader;
+ private final int _dictVarsFwdIndexReaderStartOffset;
+ private final FixedBitMVForwardIndexReader _dictVarsFwdIndexReader;
+ private final int _encodedVarFwdIndexReaderStartOffset;
+ private final VarByteChunkForwardIndexReaderV4 _encodedVarFwdIndexReader;
+ private final MessageDecoder _clpMessageDecoder;
+
+ public CLPForwardIndexReaderV1(PinotDataBuffer pinotDataBuffer, int numDocs)
{
+ _numDocs = numDocs;
+ int offset = CLPForwardIndexCreatorV1.MAGIC_BYTES.length;
+ _version = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ _totalDictVarValues = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ _logTypeDictNumBytesPerValue = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ _dictVarsDictNumBytesPerValue = pinotDataBuffer.getInt(offset);
+ offset += 4;
+
+ int logTypeDictLength = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ int dictVarDictLength = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ int logTypeFwdIndexLength = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ int dictVarsFwdIndexLength = pinotDataBuffer.getInt(offset);
+ offset += 4;
+ int encodedVarFwdIndexLength = pinotDataBuffer.getInt(offset);
+ offset += 4;
+
+ _logTypeDictReaderStartOffset = offset;
+ _logTypeDictReader = new VarLengthValueReader(pinotDataBuffer.view(offset,
offset + logTypeDictLength));
+ offset += logTypeDictLength;
+
+ _dictVarsDictReaderStartOffset = offset;
+ _dictVarsDictReader = new
VarLengthValueReader(pinotDataBuffer.view(offset, offset + dictVarDictLength));
+ offset += dictVarDictLength;
+
+ _logTypeFwdIndexReaderStartOffset = offset;
+ _logTypeFwdIndexReader =
+ new FixedBitSVForwardIndexReader(pinotDataBuffer.view(offset, offset +
logTypeFwdIndexLength), _numDocs,
+
PinotDataBitSet.getNumBitsPerValue(_logTypeDictReader.getNumValues() - 1));
+ offset += logTypeFwdIndexLength;
+
+ _dictVarsFwdIndexReaderStartOffset = offset;
+ _dictVarsFwdIndexReader =
+ new FixedBitMVForwardIndexReader(pinotDataBuffer.view(offset, offset +
dictVarsFwdIndexLength), _numDocs,
+ _totalDictVarValues,
PinotDataBitSet.getNumBitsPerValue(_dictVarsDictReader.getNumValues() - 1));
+ offset += dictVarsFwdIndexLength;
+
+ _encodedVarFwdIndexReaderStartOffset = offset;
+ _encodedVarFwdIndexReader =
+ new VarByteChunkForwardIndexReaderV4(pinotDataBuffer.view(offset,
offset + encodedVarFwdIndexLength),
+ FieldSpec.DataType.LONG, false);
+ offset += encodedVarFwdIndexLength;
+
+ _clpMessageDecoder = new
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public FieldSpec.DataType getStoredType() {
+ return FieldSpec.DataType.STRING;
+ }
+
+ @Override
+ public String getString(int docId, CLPReaderContext context) {
+ int logTypeDictId = _logTypeFwdIndexReader.getDictId(docId,
context._logTypeReaderContext);
+ String logType = _logTypeDictReader.getUnpaddedString(logTypeDictId,
_logTypeDictNumBytesPerValue,
+ new byte[_logTypeDictNumBytesPerValue]);
+ int[] dictVarsDictIds = _dictVarsFwdIndexReader.getDictIdMV(docId,
context._dictVarsReaderContext);
+
+ String[] dictVars = new String[dictVarsDictIds.length];
+ for (int i = 0; i < dictVarsDictIds.length; i++) {
+ dictVars[i] = _dictVarsDictReader.getUnpaddedString(dictVarsDictIds[i],
_dictVarsDictNumBytesPerValue,
+ new byte[_dictVarsDictNumBytesPerValue]);
+ }
+ long[] encodedVar = _encodedVarFwdIndexReader.getLongMV(docId,
context._encodedVarReaderContext);
+
+ try {
+ return _clpMessageDecoder.decodeMessage(logType, dictVars, encodedVar);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+
+ @Override
+ public CLPReaderContext createContext() {
+ return new CLPReaderContext(_dictVarsFwdIndexReader.createContext(),
_logTypeFwdIndexReader.createContext(),
+ _encodedVarFwdIndexReader.createContext());
+ }
+
+ @Override
+ public boolean isBufferByteRangeInfoSupported() {
+ return true;
+ }
+
+ @Override
+ public void recordDocIdByteRanges(int docId, CLPReaderContext context,
List<ByteRange> ranges) {
+ int logTypeDictId = _logTypeFwdIndexReader.getDictId(docId,
context._logTypeReaderContext);
+ ranges.add(new ByteRange(_logTypeFwdIndexReaderStartOffset +
_logTypeFwdIndexReader.getRawDataStartOffset()
+ + (long) _logTypeFwdIndexReader.getDocLength() * docId,
_logTypeFwdIndexReader.getDocLength()));
+ _logTypeDictReader.recordOffsetRanges(logTypeDictId,
_logTypeDictReaderStartOffset, ranges);
+
+ int[] dictVarsDictIds = _dictVarsFwdIndexReader.getDictIdMV(docId,
context._dictVarsReaderContext);
+ List<ByteRange> fwdIndexByteRanges = new ArrayList<>();
+ _dictVarsFwdIndexReader.recordDocIdByteRanges(docId,
context._dictVarsReaderContext, fwdIndexByteRanges);
+ for (ByteRange range : fwdIndexByteRanges) {
+ ranges.add(new ByteRange(_dictVarsFwdIndexReaderStartOffset +
range.getOffset(), range.getSizeInBytes()));
+ }
+ fwdIndexByteRanges.clear();
+
+ for (int dictVarsDictId : dictVarsDictIds) {
+ _dictVarsDictReader.recordOffsetRanges(dictVarsDictId,
_dictVarsDictReaderStartOffset, ranges);
+ }
+ _encodedVarFwdIndexReader.recordDocIdByteRanges(docId,
context._encodedVarReaderContext, fwdIndexByteRanges);
+ for (ByteRange range : fwdIndexByteRanges) {
+ ranges.add(new ByteRange(_encodedVarFwdIndexReaderStartOffset +
range.getOffset(), range.getSizeInBytes()));
+ }
+ }
+
+ @Override
+ public boolean isFixedOffsetMappingType() {
+ return false;
+ }
+
+ @Override
+ public ChunkCompressionType getCompressionType() {
+ return ChunkCompressionType.PASS_THROUGH;
+ }
+
+ public static final class CLPReaderContext implements
ForwardIndexReaderContext {
+ private final FixedBitMVForwardIndexReader.Context _dictVarsReaderContext;
+ private final ForwardIndexReaderContext _logTypeReaderContext;
+ private final VarByteChunkForwardIndexReaderV4.ReaderContext
_encodedVarReaderContext;
+
+ public CLPReaderContext(FixedBitMVForwardIndexReader.Context
dictVarsReaderContext,
+ ForwardIndexReaderContext logTypeReaderContext,
+ VarByteChunkForwardIndexReaderV4.ReaderContext
encodedVarReaderContext) {
+ _dictVarsReaderContext = dictVarsReaderContext;
+ _logTypeReaderContext = logTypeReaderContext;
+ _encodedVarReaderContext = encodedVarReaderContext;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (_dictVarsReaderContext != null) {
+ _dictVarsReaderContext.close();
+ }
+ if (_logTypeReaderContext != null) {
+ _logTypeReaderContext.close();
+ }
+ if (_encodedVarReaderContext != null) {
+ _encodedVarReaderContext.close();
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 43fd69488b..6c83ae9cb8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -1192,8 +1192,14 @@ public final class TableConfigUtils {
CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
switch (encodingType) {
case RAW:
- Preconditions.checkArgument(compressionCodec == null ||
compressionCodec.isApplicableToRawIndex(),
- "Compression codec: %s is not applicable to raw index",
compressionCodec);
+ Preconditions.checkArgument(compressionCodec == null ||
compressionCodec.isApplicableToRawIndex()
+ || compressionCodec == CompressionCodec.CLP, "Compression
codec: %s is not applicable to raw index",
+ compressionCodec);
+ if (compressionCodec == CompressionCodec.CLP && schema != null) {
+ Preconditions.checkArgument(
+
schema.getFieldSpecFor(columnName).getDataType().getStoredType() ==
DataType.STRING,
+ "CLP compression codec can only be applied to string columns");
+ }
break;
case DICTIONARY:
Preconditions.checkArgument(compressionCodec == null ||
compressionCodec.isApplicableToDictEncodedIndex(),
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
index 69bc85551c..a678be4269 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
@@ -20,9 +20,12 @@ package org.apache.pinot.segment.local.segment.creator;
import java.io.File;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -54,6 +57,7 @@ import
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -149,29 +153,29 @@ public class DictionariesTest {
Assert.assertTrue(heapDictionary instanceof IntDictionary);
Assert.assertTrue(mmapDictionary instanceof IntDictionary);
int firstInt = heapDictionary.getIntValue(0);
- Assert.assertEquals(heapDictionary.indexOf(firstInt),
heapDictionary.indexOf("" + firstInt));
- Assert.assertEquals(mmapDictionary.indexOf(firstInt),
mmapDictionary.indexOf("" + firstInt));
+ Assert.assertEquals(heapDictionary.indexOf(firstInt),
heapDictionary.indexOf(String.valueOf(firstInt)));
+ Assert.assertEquals(mmapDictionary.indexOf(firstInt),
mmapDictionary.indexOf(String.valueOf(firstInt)));
break;
case LONG:
Assert.assertTrue(heapDictionary instanceof LongDictionary);
Assert.assertTrue(mmapDictionary instanceof LongDictionary);
long firstLong = heapDictionary.getLongValue(0);
- Assert.assertEquals(heapDictionary.indexOf(firstLong),
heapDictionary.indexOf("" + firstLong));
- Assert.assertEquals(mmapDictionary.indexOf(firstLong),
mmapDictionary.indexOf("" + firstLong));
+ Assert.assertEquals(heapDictionary.indexOf(firstLong),
heapDictionary.indexOf(String.valueOf(firstLong)));
+ Assert.assertEquals(mmapDictionary.indexOf(firstLong),
mmapDictionary.indexOf(String.valueOf(firstLong)));
break;
case FLOAT:
Assert.assertTrue(heapDictionary instanceof FloatDictionary);
Assert.assertTrue(mmapDictionary instanceof FloatDictionary);
float firstFloat = heapDictionary.getFloatValue(0);
- Assert.assertEquals(heapDictionary.indexOf(firstFloat),
heapDictionary.indexOf("" + firstFloat));
- Assert.assertEquals(mmapDictionary.indexOf(firstFloat),
mmapDictionary.indexOf("" + firstFloat));
+ Assert.assertEquals(heapDictionary.indexOf(firstFloat),
heapDictionary.indexOf(String.valueOf(firstFloat)));
+ Assert.assertEquals(mmapDictionary.indexOf(firstFloat),
mmapDictionary.indexOf(String.valueOf(firstFloat)));
break;
case DOUBLE:
Assert.assertTrue(heapDictionary instanceof DoubleDictionary);
Assert.assertTrue(mmapDictionary instanceof DoubleDictionary);
double firstDouble = heapDictionary.getDoubleValue(0);
- Assert.assertEquals(heapDictionary.indexOf(firstDouble),
heapDictionary.indexOf("" + firstDouble));
- Assert.assertEquals(mmapDictionary.indexOf(firstDouble),
mmapDictionary.indexOf("" + firstDouble));
+ Assert.assertEquals(heapDictionary.indexOf(firstDouble),
heapDictionary.indexOf(String.valueOf(firstDouble)));
+ Assert.assertEquals(mmapDictionary.indexOf(firstDouble),
mmapDictionary.indexOf(String.valueOf(firstDouble)));
break;
case BIG_DECIMAL:
Assert.assertTrue(heapDictionary instanceof BigDecimalDictionary);
@@ -530,4 +534,35 @@ public class DictionariesTest {
throw new IllegalArgumentException("Illegal data type for stats
builder: " + dataType);
}
}
+
+ @Test
+ public void clpStatsCollectorTest() {
+ Schema schema = new Schema();
+ schema.addField(new DimensionFieldSpec("column1", DataType.STRING, true));
+ List<FieldConfig> fieldConfigList = new ArrayList<>();
+ fieldConfigList.add(new FieldConfig("column1",
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
+ FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
+ _tableConfig.setFieldConfigList(fieldConfigList);
+ StatsCollectorConfig statsCollectorConfig = new
StatsCollectorConfig(_tableConfig, schema, null);
+ StringColumnPreIndexStatsCollector statsCollector =
+ new StringColumnPreIndexStatsCollector("column1",
statsCollectorConfig);
+
+ List<String> logLines = new ArrayList<>();
+ logLines.add(
+ "2023/10/26 00:03:10.168 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property
LiveInstance took 5 ms. Selective: true");
+ logLines.add(
+ "2023/10/26 00:03:10.169 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property
LiveInstance took 4 ms. Selective: true");
+
+ for (String logLine : logLines) {
+ statsCollector.collect(logLine);
+ }
+ statsCollector.seal();
+
+ Assert.assertNotNull(statsCollector.getCLPStats());
+
+ // Same log line format
+
Assert.assertEquals(statsCollector.getCLPStats().getSortedLogTypeValues().length,
1);
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
new file mode 100644
index 0000000000..c97a1d318f
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class CLPForwardIndexCreatorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"CLPForwardIndexCreatorTest");
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+ }
+
+ @Test
+ public void testCLPWriter()
+ throws Exception {
+ List<String> logLines = new ArrayList<>();
+ logLines.add(
+ "2023/10/26 00:03:10.168 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property
LiveInstance took 5 ms. Selective: true");
+ logLines.add(
+ "2023/10/26 00:03:10.169 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property
LiveInstance took 4 ms. Selective: true");
+ logLines.add(
+ "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter]
[grizzly-http-server-2] Handled request from 0.0"
+ + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness,
content-type null status code 200 OK");
+ logLines.add(
+ "2023/10/27 16:35:10.607 INFO [ControllerResponseFilter]
[grizzly-http-server-6] Handled request from 0.0"
+ + ".0.0 GET
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables,
content-type "
+ + "application/json status code 200 OK");
+ logLines.add("null");
+
+ Schema schema = new Schema();
+ schema.addField(new DimensionFieldSpec("column1",
FieldSpec.DataType.STRING, true));
+ TableConfig tableConfig =
+ new TableConfig("mytable", TableType.REALTIME.name(), new
SegmentsValidationAndRetentionConfig(),
+ new TenantConfig(null, null, null), new IndexingConfig(), new
TableCustomConfig(null), null, null, null,
+ null, null, null, null, null, null, null, null, false, null, null,
null);
+ List<FieldConfig> fieldConfigList = new ArrayList<>();
+ fieldConfigList.add(new FieldConfig("column1",
FieldConfig.EncodingType.RAW, Collections.EMPTY_LIST,
+ FieldConfig.CompressionCodec.CLP, Collections.EMPTY_MAP));
+ tableConfig.setFieldConfigList(fieldConfigList);
+ StatsCollectorConfig statsCollectorConfig = new
StatsCollectorConfig(tableConfig, schema, null);
+ StringColumnPreIndexStatsCollector statsCollector =
+ new StringColumnPreIndexStatsCollector("column1",
statsCollectorConfig);
+ for (String logLine : logLines) {
+ statsCollector.collect(logLine);
+ }
+ statsCollector.seal();
+
+ File indexFile = new File(TEMP_DIR, "column1" +
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ CLPForwardIndexCreatorV1 clpForwardIndexCreatorV1 =
+ new CLPForwardIndexCreatorV1(TEMP_DIR, "column1", logLines.size(),
statsCollector);
+
+ for (String logLine : logLines) {
+ clpForwardIndexCreatorV1.putString(logLine);
+ }
+ clpForwardIndexCreatorV1.seal();
+ clpForwardIndexCreatorV1.close();
+
+ PinotDataBuffer pinotDataBuffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
+ CLPForwardIndexReaderV1 clpForwardIndexReaderV1 = new
CLPForwardIndexReaderV1(pinotDataBuffer, logLines.size());
+ for (int i = 0; i < logLines.size(); i++) {
+ Assert.assertEquals(clpForwardIndexReaderV1.getString(i,
clpForwardIndexReaderV1.createContext()),
+ logLines.get(i));
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
new file mode 100644
index 0000000000..a926b9a396
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
+import
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPMutableForwardIndexTest {
+ private PinotDataBufferMemoryManager _memoryManager;
+
+ @BeforeClass
+ public void setUp() {
+ _memoryManager = new
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ _memoryManager.close();
+ }
+
+ @Test
+ public void testString()
+ throws IOException {
+ // use arbitrary cardinality and avg string length
+ // we will test with complete randomness
+ int initialCapacity = 5;
+ int estimatedAvgStringLength = 30;
+ try (CLPMutableForwardIndex readerWriter = new
CLPMutableForwardIndex("col1", FieldSpec.DataType.STRING,
+ _memoryManager, initialCapacity)) {
+ int rows = 3;
+ List<String> logLines = new ArrayList<>();
+ logLines.add(
+ "2023/10/26 00:03:10.168 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35
property LiveInstance took 5 ms. Selective:"
+ + " true");
+ logLines.add(
+ "2023/10/26 00:03:10.169 INFO [PropertyCache]
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+ + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81
property LiveInstance took 4 ms. Selective:"
+ + " true");
+ logLines.add(
+ "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter]
[grizzly-http-server-2] Handled request from 0.0"
+ + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness,
content-type null status code 200 OK");
+ logLines.add(
+ "2023/10/27 16:35:10.607 INFO [ControllerResponseFilter]
[grizzly-http-server-6] Handled request from 0.0"
+ + ".0.0 GET
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables,
content-type "
+ + "application/json status code 200 OK");
+ logLines.add("null");
+
+ for (int i = 0; i < rows; i++) {
+ readerWriter.setString(i, logLines.get(i));
+ }
+
+ for (int i = 0; i < rows; i++) {
+ Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
+ }
+
+ // Verify clp stats
+ StringColumnPreIndexStatsCollector.CLPStatsCollector statsCollector =
+ new StringColumnPreIndexStatsCollector.CLPStatsCollector();
+ for (int i = 0; i < rows; i++) {
+ statsCollector.collect(logLines.get(i));
+ }
+ statsCollector.seal();
+ CLPStatsProvider.CLPStats stats = statsCollector.getCLPStats();
+
+ CLPStatsProvider.CLPStats mutableIndexStats = readerWriter.getCLPStats();
+ Assert.assertEquals(stats.getTotalNumberOfDictVars(),
mutableIndexStats.getTotalNumberOfDictVars());
+ Assert.assertEquals(stats.getMaxNumberOfEncodedVars(),
mutableIndexStats.getMaxNumberOfEncodedVars());
+ Assert.assertEquals(stats.getSortedDictVarValues(),
mutableIndexStats.getSortedDictVarValues());
+ Assert.assertEquals(stats.getTotalNumberOfEncodedVars(),
mutableIndexStats.getTotalNumberOfEncodedVars());
+ Assert.assertEquals(stats.getSortedLogTypeValues(),
mutableIndexStats.getSortedLogTypeValues());
+ Assert.assertEquals(stats.getSortedDictVarValues(),
mutableIndexStats.getSortedDictVarValues());
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
index af0232d661..9e47502098 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
@@ -43,6 +43,10 @@ public class ColumnIndexCreationInfo implements Serializable
{
_defaultNullValue = defaultNullValue;
}
+ public ColumnStatistics getColumnStatistics() {
+ return _columnStatistics;
+ }
+
public boolean isCreateDictionary() {
return _createDictionary;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 5f4dd2cd66..52df382efa 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -22,12 +22,14 @@ import java.io.File;
import java.util.Objects;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.data.FieldSpec;
/**
- * Provides parameters for constructing indexes via {@link
IndexType#createIndexCreator(IndexCreationContext, Object)}.
+ * Provides parameters for constructing indexes via
+ * {@link IndexType#createIndexCreator(IndexCreationContext, IndexConfig)}.
* The responsibility for ensuring that the correct parameters for a particular
* index type lies with the caller.
*/
@@ -88,7 +90,10 @@ public interface IndexCreationContext {
*/
boolean isTextCommitOnClose();
+ ColumnStatistics getColumnStatistics();
+
final class Builder {
+ private ColumnStatistics _columnStatistics;
private File _indexDir;
private int _lengthOfLongestEntry;
private int _maxNumberOfMultiValueElements;
@@ -116,11 +121,17 @@ public interface IndexCreationContext {
.withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
.withSortedUniqueElementsArray(columnIndexCreationInfo.getSortedUniqueElementsArray())
+ .withColumnStatistics(columnIndexCreationInfo.getColumnStatistics())
.withCardinality(columnIndexCreationInfo.getDistinctValueCount())
.withFixedLength(columnIndexCreationInfo.isFixedLength())
.sorted(columnIndexCreationInfo.isSorted());
}
+ public Builder withColumnStatistics(ColumnStatistics columnStatistics) {
+ _columnStatistics = columnStatistics;
+ return this;
+ }
+
public Builder withIndexDir(File indexDir) {
_indexDir = indexDir;
return this;
@@ -222,7 +233,7 @@ public interface IndexCreationContext {
return new Common(Objects.requireNonNull(_indexDir),
_lengthOfLongestEntry, _maxNumberOfMultiValueElements,
_maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec),
_sorted, _cardinality,
_totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue,
_maxValue, _forwardIndexDisabled,
- _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength,
_textCommitOnClose);
+ _sortedUniqueElementsArray, _optimizedDictionary, _fixedLength,
_textCommitOnClose, _columnStatistics);
}
public Builder withSortedUniqueElementsArray(Object
sortedUniqueElementsArray) {
@@ -255,13 +266,14 @@ public interface IndexCreationContext {
private final boolean _optimizeDictionary;
private final boolean _fixedLength;
private final boolean _textCommitOnClose;
+ private final ColumnStatistics _columnStatistics;
public Common(File indexDir, int lengthOfLongestEntry,
int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean
onHeap,
FieldSpec fieldSpec, boolean sorted, int cardinality, int
totalNumberOfEntries,
int totalDocs, boolean hasDictionary, Comparable<?> minValue,
Comparable<?> maxValue,
boolean forwardIndexDisabled, Object sortedUniqueElementsArray,
boolean optimizeDictionary,
- boolean fixedLength, boolean textCommitOnClose) {
+ boolean fixedLength, boolean textCommitOnClose, ColumnStatistics
columnStatistics) {
_indexDir = indexDir;
_lengthOfLongestEntry = lengthOfLongestEntry;
_maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -280,6 +292,7 @@ public interface IndexCreationContext {
_optimizeDictionary = optimizeDictionary;
_fixedLength = fixedLength;
_textCommitOnClose = textCommitOnClose;
+ _columnStatistics = columnStatistics;
}
public FieldSpec getFieldSpec() {
@@ -360,5 +373,10 @@ public interface IndexCreationContext {
public boolean isTextCommitOnClose() {
return _textCommitOnClose;
}
+
+ @Override
+ public ColumnStatistics getColumnStatistics() {
+ return _columnStatistics;
+ }
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 52abf41ea1..6305dcd852 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -274,7 +274,8 @@ public class SegmentGeneratorConfig implements Serializable
{
if (fieldConfigList != null) {
for (FieldConfig fieldConfig : fieldConfigList) {
if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW
- && fieldConfig.getCompressionCodec() != null) {
+ && fieldConfig.getCompressionCodec() != null
+ && fieldConfig.getCompressionCodec().isApplicableToRawIndex()) {
_rawIndexCreationColumns.add(fieldConfig.getName());
_rawIndexCompressionType.put(fieldConfig.getName(),
ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()));
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
index f868eea15a..fdd8abfcd7 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java
@@ -19,8 +19,10 @@
package org.apache.pinot.segment.spi.creator;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -36,6 +38,7 @@ public class StatsCollectorConfig {
private final TableConfig _tableConfig;
private final Schema _schema;
private final SegmentPartitionConfig _segmentPartitionConfig;
+ private final Map<String, FieldConfig> _columnFieldConfigMap;
/**
* Constructor for the class.
@@ -49,6 +52,12 @@ public class StatsCollectorConfig {
_tableConfig = tableConfig;
_schema = schema;
_segmentPartitionConfig = segmentPartitionConfig;
+ _columnFieldConfigMap = new HashMap<>();
+ if (tableConfig.getFieldConfigList() != null) {
+ for (FieldConfig fieldConfig : tableConfig.getFieldConfigList()) {
+ _columnFieldConfigMap.put(fieldConfig.getName(), fieldConfig);
+ }
+ }
}
@Nullable
@@ -89,4 +98,9 @@ public class StatsCollectorConfig {
public TableConfig getTableConfig() {
return _tableConfig;
}
+
+ @Nullable
+ public FieldConfig getFieldConfigForColumn(String column) {
+ return _columnFieldConfigMap.get(column);
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 5db086a13b..132705036b 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -58,6 +58,7 @@ public class ForwardIndexConfig extends IndexConfig {
if (compressionCodec != null) {
switch (compressionCodec) {
case PASS_THROUGH:
+ case CLP:
_chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
_dictIdCompressionType = null;
break;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 16b50ecbde..704cb2e01c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -126,6 +126,9 @@ public class FieldConfig extends BaseJsonConfig {
SNAPPY(true, false),
ZSTANDARD(true, false),
LZ4(true, false),
+ // CLP is a special type of compression codec that isn't generally
applicable to all RAW columns and has a
+ // special handling for log lines (see {@link CLPForwardIndexCreatorV1})
+ CLP(false, false),
// For MV dictionary encoded forward index, add a second level dictionary
encoding for the multi-value entries
MV_ENTRY_DICT(false, true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]