This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 819168f2e8 [Clean up] Remove IntermediateSegment (#10199)
819168f2e8 is described below
commit 819168f2e87e5b0499a64c19a99db3e63033a6e3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jan 27 21:17:30 2023 -0800
[Clean up] Remove IntermediateSegment (#10199)
---
.../indexsegment/mutable/IntermediateSegment.java | 370 ---------------------
.../creator/IntermediateSegmentStatsContainer.java | 54 ---
.../index/column/IntermediateIndexContainer.java | 134 --------
.../mutable/IntermediateSegmentTest.java | 260 ---------------
4 files changed, 818 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
deleted file mode 100644
index 08bbcba7a2..0000000000
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.mutable;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
-import
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
-import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
-import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
-import
org.apache.pinot.segment.local.segment.index.column.IntermediateIndexContainer;
-import org.apache.pinot.segment.local.segment.index.column.NumValuesInfo;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
-import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
-import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
-import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
-import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.FieldSpec.FieldType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.RowMetadata;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Intermediate segment format to store the collected data so far. This
segment format will be used to generate the
- * final
- * offline segment in SegmentIndexCreationDriver.
- */
-public class IntermediateSegment implements MutableSegment {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IntermediateSegment.class);
-
- private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
- private static final int DEFAULT_CAPACITY = 100_000;
- private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
- private static final int DEFAULT_EST_CARDINALITY = 5000;
- private static final int DEFAULT_AVG_MULTI_VALUE_COUNT = 2;
-
- private final SegmentGeneratorConfig _segmentGeneratorConfig;
- private final Schema _schema;
- private final TableConfig _tableConfig;
- private final String _segmentName;
- private final SegmentMetadata _segmentMetadata;
- private final Map<String, IntermediateIndexContainer> _indexContainerMap =
new HashMap<>();
- private final PinotDataBufferMemoryManager _memoryManager;
- private final File _mmapDir;
-
- private final int _capacity = DEFAULT_CAPACITY;
- private volatile int _numDocsIndexed = 0;
-
- public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
- _segmentGeneratorConfig = segmentGeneratorConfig;
- _schema = segmentGeneratorConfig.getSchema();
- _tableConfig = segmentGeneratorConfig.getTableConfig();
- _segmentName = _segmentGeneratorConfig.getTableName() +
System.currentTimeMillis();
- _segmentMetadata =
- new
SegmentMetadataImpl(TableNameBuilder.extractRawTableName(_tableConfig.getTableName()),
_segmentName,
- _schema, System.currentTimeMillis()) {
- @Override
- public int getTotalDocs() {
- return _numDocsIndexed;
- }
- };
-
- Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
- List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
- for (FieldSpec fieldSpec : allFieldSpecs) {
- if (!fieldSpec.isVirtualColumn()) {
- physicalFieldSpecs.add(fieldSpec);
- }
- }
-
- String outputDir = segmentGeneratorConfig.getOutDir();
- _mmapDir = new File(outputDir, _segmentName + "_mmap_" +
UUID.randomUUID());
- _mmapDir.mkdir();
- LOGGER.info("Mmap file dir: " + _mmapDir);
- _memoryManager = new MmapMemoryManager(_mmapDir.toString(), _segmentName,
null);
-
- // Initialize for each column
- for (FieldSpec fieldSpec : physicalFieldSpecs) {
- String column = fieldSpec.getName();
-
- // Partition info
- SegmentPartitionConfig segmentPartitionConfig =
segmentGeneratorConfig.getSegmentPartitionConfig();
- PartitionFunction partitionFunction = null;
- Set<Integer> partitions = null;
- if (segmentPartitionConfig != null &&
segmentPartitionConfig.getColumnPartitionMap().containsKey(column)) {
- partitionFunction =
-
PartitionFunctionFactory.getPartitionFunction(segmentPartitionConfig.getFunctionName(column),
- segmentPartitionConfig.getNumPartitions(column),
segmentPartitionConfig.getFunctionConfig(column));
- partitions = new HashSet<>();
- partitions.add(segmentGeneratorConfig.getSequenceId());
- }
-
- DataType storedType = fieldSpec.getDataType().getStoredType();
- boolean isFixedWidthColumn = storedType.isFixedWidth();
- MutableForwardIndex forwardIndex;
- MutableDictionary dictionary;
-
- int dictionaryColumnSize;
- if (isFixedWidthColumn) {
- dictionaryColumnSize = storedType.size();
- } else {
- dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
- }
- // NOTE: preserve 10% buffer for cardinality to reduce the chance of
re-sizing the dictionary
- int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
- String dictionaryAllocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Dict.FILE_EXTENSION);
- dictionary = MutableDictionaryFactory.getMutableDictionary(storedType,
true, _memoryManager, dictionaryColumnSize,
- Math.min(estimatedCardinality, _capacity),
dictionaryAllocationContext);
-
- if (fieldSpec.isSingleValueField()) {
- // Single-value dictionary-encoded forward index
- String allocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
- forwardIndex =
- new FixedByteSVMutableForwardIndex(true, DataType.INT, _capacity,
_memoryManager, allocationContext);
- } else {
- // Multi-value dictionary-encoded forward index
- String allocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
- // TODO: Start with a smaller capacity on
FixedByteMVForwardIndexReaderWriter and let it expand
- forwardIndex =
- new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
DEFAULT_AVG_MULTI_VALUE_COUNT, _capacity,
- Integer.BYTES, _memoryManager, allocationContext, true,
DataType.INT);
- }
-
- _indexContainerMap.put(column,
- new IntermediateIndexContainer(fieldSpec, partitionFunction,
partitions, new NumValuesInfo(), forwardIndex,
- dictionary));
- }
- }
-
- @Override
- public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
- throws IOException {
- updateDictionary(row);
- addNewRow(row);
- _numDocsIndexed++;
- return true;
- }
-
- @Override
- public int getNumDocsIndexed() {
- return _numDocsIndexed;
- }
-
- @Override
- public String getSegmentName() {
- return _segmentName;
- }
-
- @Override
- public SegmentMetadata getSegmentMetadata() {
- return _segmentMetadata;
- }
-
- @Override
- public Set<String> getColumnNames() {
- return _schema.getColumnNames();
- }
-
- @Override
- public Set<String> getPhysicalColumnNames() {
- return _schema.getPhysicalColumnNames();
- }
-
- @Override
- public DataSource getDataSource(String columnName) {
- return _indexContainerMap.get(columnName).toDataSource(_numDocsIndexed);
- }
-
- @Override
- public List<StarTreeV2> getStarTrees() {
- return null;
- }
-
- @Nullable
- @Override
- public ThreadSafeMutableRoaringBitmap getValidDocIds() {
- return null;
- }
-
- @Override
- public GenericRow getRecord(int docId, GenericRow reuse) {
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
- recordReader.init(this);
- recordReader.getRecord(docId, reuse);
- return reuse;
- } catch (Exception e) {
- throw new RuntimeException("Caught exception while reading record for
docId: " + docId, e);
- }
- }
-
- @Override
- public Object getValue(int docId, String column) {
- try (PinotSegmentColumnReader columnReader = new
PinotSegmentColumnReader(this, column)) {
- return columnReader.getValue(docId);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format("Caught exception while reading value for docId: %d,
column: %s", docId, column), e);
- }
- }
-
- @Override
- public void destroy() {
- String segmentName = getSegmentName();
- LOGGER.info("Trying to destroy segment : {}", segmentName);
- for (Map.Entry<String, IntermediateIndexContainer> entry :
_indexContainerMap.entrySet()) {
- try {
- entry.getValue().close();
- } catch (IOException e) {
- LOGGER.error("Failed to close indexes for column: {}. Continuing with
error.", entry.getKey(), e);
- }
- }
- FileUtils.deleteQuietly(_mmapDir);
- }
-
- private void updateDictionary(GenericRow row) {
- for (Map.Entry<String, IntermediateIndexContainer> entry :
_indexContainerMap.entrySet()) {
- String column = entry.getKey();
- IntermediateIndexContainer indexContainer = entry.getValue();
- Object value = row.getValue(column);
- MutableDictionary dictionary = indexContainer.getDictionary();
- if (dictionary != null) {
- if (indexContainer.getFieldSpec().isSingleValueField()) {
- indexContainer.setDictId(dictionary.index(value));
- } else {
- indexContainer.setDictIds(dictionary.index((Object[]) value));
- }
-
- // Update min/max value from dictionary
- indexContainer.setMinValue(dictionary.getMinVal());
- indexContainer.setMaxValue(dictionary.getMaxVal());
- }
- }
- }
-
- private void addNewRow(GenericRow row)
- throws IOException {
- int docId = _numDocsIndexed;
- for (Map.Entry<String, IntermediateIndexContainer> entry :
_indexContainerMap.entrySet()) {
- String column = entry.getKey();
- IntermediateIndexContainer indexContainer = entry.getValue();
- Object value = row.getValue(column);
- FieldSpec fieldSpec = indexContainer.getFieldSpec();
- if (fieldSpec.isSingleValueField()) {
- // Update numValues info
- indexContainer.getNumValuesInfo().updateSVEntry();
-
- // Update indexes
- MutableForwardIndex forwardIndex = indexContainer.getForwardIndex();
- int dictId = indexContainer.getDictId();
- if (dictId >= 0) {
- // Dictionary-encoded single-value column
-
- // Update forward index
- forwardIndex.setDictId(docId, dictId);
- } else {
- // Single-value column with raw index
-
- // Update forward index
- DataType dataType = fieldSpec.getDataType();
- switch (dataType.getStoredType()) {
- case INT:
- forwardIndex.setInt(docId, (Integer) value);
- break;
- case LONG:
- forwardIndex.setLong(docId, (Long) value);
- break;
- case FLOAT:
- forwardIndex.setFloat(docId, (Float) value);
- break;
- case DOUBLE:
- forwardIndex.setDouble(docId, (Double) value);
- break;
- case STRING:
- forwardIndex.setString(docId, (String) value);
- break;
- case BYTES:
- forwardIndex.setBytes(docId, (byte[]) value);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported data type: " + dataType + " for no-dictionary
column: " + column);
- }
-
- // Update min/max value from raw value
- // NOTE: Skip updating min/max value for aggregated metrics because
the value will change over time.
- if (fieldSpec.getFieldType() != FieldType.METRIC) {
- Comparable comparable;
- if (dataType == DataType.BYTES) {
- comparable = new ByteArray((byte[]) value);
- } else {
- comparable = (Comparable) value;
- }
- if (indexContainer.getMinValue() == null) {
- indexContainer.setMinValue(comparable);
- indexContainer.setMaxValue(comparable);
- } else {
- if (comparable.compareTo(indexContainer.getMinValue()) < 0) {
- indexContainer.setMinValue(comparable);
- }
- if (comparable.compareTo(indexContainer.getMaxValue()) > 0) {
- indexContainer.setMaxValue(comparable);
- }
- }
- }
- }
- } else {
- // Multi-value column (always dictionary-encoded)
- int[] dictIds = indexContainer.getDictIds();
-
- // Update numValues info
- indexContainer.getNumValuesInfo().updateMVEntry(dictIds.length);
-
- // Update forward index
- indexContainer.getForwardIndex().setDictIdMV(docId, dictIds);
- }
- }
- }
-
- private String buildAllocationContext(String segmentName, String columnName,
String indexType) {
- return segmentName + ":" + columnName + indexType;
- }
-}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
deleted file mode 100644
index edec434d66..0000000000
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.creator;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.segment.local.indexsegment.mutable.IntermediateSegment;
-import
org.apache.pinot.segment.local.realtime.converter.stats.MutableColumnStatistics;
-import org.apache.pinot.segment.spi.creator.ColumnStatistics;
-import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsContainer;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-
-
-public class IntermediateSegmentStatsContainer implements
SegmentPreIndexStatsContainer {
- private final IntermediateSegment _intermediateSegment;
- private final Map<String, ColumnStatistics> _columnStatisticsMap = new
HashMap<>();
-
- public IntermediateSegmentStatsContainer(IntermediateSegment
intermediateSegment) {
- _intermediateSegment = intermediateSegment;
-
- // Create all column statistics
- for (String columnName : intermediateSegment.getPhysicalColumnNames()) {
- DataSource dataSource = intermediateSegment.getDataSource(columnName);
- // Always use dictionary for intermediate segment stats
- _columnStatisticsMap.put(columnName, new
MutableColumnStatistics(dataSource, null));
- }
- }
-
- @Override
- public ColumnStatistics getColumnProfileFor(String column) {
- return _columnStatisticsMap.get(column);
- }
-
- @Override
- public int getTotalDocCount() {
- return _intermediateSegment.getNumDocsIndexed();
- }
-}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
deleted file mode 100644
index dd5e8c4ada..0000000000
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.column;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import javax.annotation.Nullable;
-import
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
-import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class IntermediateIndexContainer implements Closeable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IntermediateIndexContainer.class);
-
- final FieldSpec _fieldSpec;
- final PartitionFunction _partitionFunction;
- final Set<Integer> _partitions;
- final NumValuesInfo _numValuesInfo;
- final MutableForwardIndex _forwardIndex;
- final MutableDictionary _dictionary;
-
- volatile Comparable _minValue;
- volatile Comparable _maxValue;
-
- // Hold the dictionary id for the latest record
- int _dictId = Integer.MIN_VALUE;
- int[] _dictIds;
-
- public IntermediateIndexContainer(FieldSpec fieldSpec, @Nullable
PartitionFunction partitionFunction,
- @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo,
MutableForwardIndex forwardIndex,
- MutableDictionary dictionary) {
- _fieldSpec = fieldSpec;
- _partitionFunction = partitionFunction;
- _partitions = partitions;
- _numValuesInfo = numValuesInfo;
- _forwardIndex = forwardIndex;
- _dictionary = dictionary;
- }
-
- public DataSource toDataSource(int numDocsIndexed) {
- return new MutableDataSource(_fieldSpec, numDocsIndexed,
_numValuesInfo._numValues,
- _numValuesInfo._maxNumValuesPerMVEntry, _dictionary.length(),
_partitionFunction, _partitions, _minValue,
- _maxValue, _forwardIndex, _dictionary, null, null, null, null, null,
null, null, null);
- }
-
- @Override
- public void close()
- throws IOException {
- String column = _fieldSpec.getName();
- try {
- _forwardIndex.close();
- } catch (Exception e) {
- LOGGER.error("Caught exception while closing forward index for column:
{}, continuing with error", column, e);
- }
- if (_dictionary != null) {
- try {
- _dictionary.close();
- } catch (Exception e) {
- LOGGER.error("Caught exception while closing dictionary for column:
{}, continuing with error", column, e);
- }
- }
- }
-
- public FieldSpec getFieldSpec() {
- return _fieldSpec;
- }
-
- public NumValuesInfo getNumValuesInfo() {
- return _numValuesInfo;
- }
-
- public MutableForwardIndex getForwardIndex() {
- return _forwardIndex;
- }
-
- public MutableDictionary getDictionary() {
- return _dictionary;
- }
-
- public int getDictId() {
- return _dictId;
- }
-
- public void setDictId(int dictId) {
- _dictId = dictId;
- }
-
- public int[] getDictIds() {
- return _dictIds;
- }
-
- public void setDictIds(int[] dictIds) {
- _dictIds = dictIds;
- }
-
- public Comparable getMinValue() {
- return _minValue;
- }
-
- public void setMinValue(Comparable minValue) {
- _minValue = minValue;
- }
-
- public Comparable getMaxValue() {
- return _maxValue;
- }
-
- public void setMaxValue(Comparable maxValue) {
- _maxValue = maxValue;
- }
-}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
deleted file mode 100644
index c762942626..0000000000
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.mutable;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
-import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-
-public class IntermediateSegmentTest {
- private static final String AVRO_DATA_SV = "data/test_data-sv.avro";
- private static final String AVRO_DATA_MV = "data/test_data-mv.avro";
- private static final String SEGMENT_NAME = "testSegmentName";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"IntermediateSegmentTest");
-
- @BeforeMethod
- public void setUp()
- throws Exception {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-
- @AfterMethod
- public void tearDown() {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-
- @DataProvider(name = "segmentCreationTestCases")
- private static Object[][] createSegmentCreationTestCases() {
- return new Object[][]{{AVRO_DATA_SV}, {AVRO_DATA_MV}};
- }
-
- @Test(dataProvider = "segmentCreationTestCases")
- public void testOfflineSegmentCreationFromDifferentWays(String inputFile)
- throws Exception {
- // Get resource file path.
- URL resource = getClass().getClassLoader().getResource(inputFile);
- assertNotNull(resource);
- String filePath = resource.getFile();
-
- Schema schema = createSchema(inputFile);
- TableConfig tableConfig = createTableConfig(inputFile);
-
- // Create the segment generator config.
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
- segmentGeneratorConfig.setInputFilePath(filePath);
- segmentGeneratorConfig.setTableName("testTable");
- segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6",
"column7"));
-
- IndexSegment segmentFromIntermediateSegment =
buildSegmentFromIntermediateSegment(segmentGeneratorConfig);
- IndexSegment segmentFromAvroRecordReader =
buildSegmentFromAvroRecordReader(segmentGeneratorConfig);
-
- assertNotNull(segmentFromIntermediateSegment);
- assertNotNull(segmentFromAvroRecordReader);
- assertEquals(segmentFromIntermediateSegment.getColumnNames(),
segmentFromAvroRecordReader.getColumnNames());
- Set<String> physicalColumnsFromIntermediateSegment =
segmentFromIntermediateSegment.getPhysicalColumnNames();
- Set<String> physicalColumnsFromAvroSegment =
segmentFromAvroRecordReader.getPhysicalColumnNames();
- assertEquals(physicalColumnsFromIntermediateSegment,
physicalColumnsFromAvroSegment);
-
- // Comparison for every columns
- for (String column : physicalColumnsFromIntermediateSegment) {
- DataSource dataSourceFromIntermediateSegment =
segmentFromIntermediateSegment.getDataSource(column);
- DataSource dataSourceFromAvroRecordReader =
segmentFromAvroRecordReader.getDataSource(column);
-
- // Comparison for dictionaries.
- Dictionary actualDictionary =
dataSourceFromIntermediateSegment.getDictionary();
- Dictionary expectedDictionary =
dataSourceFromAvroRecordReader.getDictionary();
- assertEquals(actualDictionary.getMinVal(),
expectedDictionary.getMinVal());
- assertEquals(actualDictionary.getMaxVal(),
expectedDictionary.getMaxVal());
- assertEquals(actualDictionary.getValueType(),
expectedDictionary.getValueType());
- assertEquals(actualDictionary.length(), expectedDictionary.length());
- int dictionaryLength = actualDictionary.length();
- for (int i = 0; i < dictionaryLength; i++) {
- assertEquals(actualDictionary.get(i), expectedDictionary.get(i));
- }
-
- // Comparison for inverted index
- InvertedIndexReader actualInvertedIndexReader =
dataSourceFromIntermediateSegment.getInvertedIndex();
- InvertedIndexReader expectedInvertedIndexReader =
dataSourceFromAvroRecordReader.getInvertedIndex();
- if (actualInvertedIndexReader != null) {
- for (int j = 0; j < dictionaryLength; j++) {
- assertEquals(actualInvertedIndexReader.getDocIds(j),
expectedInvertedIndexReader.getDocIds(j));
- }
- }
-
- // Check for Partition Metadata.
- SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (segmentPartitionConfig != null &&
segmentPartitionConfig.getColumnPartitionMap().containsKey(column)) {
- ColumnMetadata columnMetadata =
-
segmentFromIntermediateSegment.getSegmentMetadata().getColumnMetadataFor(column);
- assertNotNull(columnMetadata.getPartitionFunction());
- assertEquals(columnMetadata.getPartitionFunction().getName(),
segmentPartitionConfig.getFunctionName(column));
- assertEquals(columnMetadata.getPartitionFunction().getNumPartitions(),
- segmentPartitionConfig.getNumPartitions(column));
- assertEquals(columnMetadata.getPartitionFunction().getFunctionConfig(),
- segmentPartitionConfig.getFunctionConfig(column));
- assertNotNull(columnMetadata.getPartitions());
- assertEquals(columnMetadata.getPartitions().size(), 1);
- }
- }
- }
-
- private IndexSegment
buildSegmentFromIntermediateSegment(SegmentGeneratorConfig
segmentGeneratorConfig)
- throws Exception {
- // Set intermediate segment record reader.
- String segmentName = SEGMENT_NAME + "_from_intermediate_segment";
- segmentGeneratorConfig.setSegmentName(segmentName);
- IntermediateSegment intermediateSegment = new
IntermediateSegment(segmentGeneratorConfig);
-
- // Ingest data.
- ingestDataToIntermediateSegment(segmentGeneratorConfig,
intermediateSegment);
- PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
- recordReader.init(intermediateSegment);
-
- // Build the segment from intermediate segment.
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, recordReader);
- driver.build();
-
- // Destroy intermediate segment after the segment creation.
- recordReader.close();
- intermediateSegment.destroy();
-
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
ReadMode.heap);
- }
-
- private IndexSegment buildSegmentFromAvroRecordReader(SegmentGeneratorConfig
segmentGeneratorConfig)
- throws Exception {
- // Use avro record reader by default
- String segmentName = SEGMENT_NAME + "_from_avro_reader";
- segmentGeneratorConfig.setSegmentName(segmentName);
-
- // Build the index segment.
- SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig);
- driver.build();
-
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
ReadMode.heap);
- }
-
- private void ingestDataToIntermediateSegment(SegmentGeneratorConfig
segmentGeneratorConfig,
- IntermediateSegment intermediateSegment)
- throws IOException {
- AvroRecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(new File(segmentGeneratorConfig.getInputFilePath()),
null, null);
-
- GenericRow genericRow = new GenericRow();
- while (avroRecordReader.hasNext()) {
- genericRow.clear();
- genericRow = avroRecordReader.next(genericRow);
- intermediateSegment.index(genericRow, null);
- }
- }
-
- private static Schema createSchema(String inputFile)
- throws IOException {
- Schema schema;
- if (AVRO_DATA_SV.equals(inputFile)) {
- schema = new
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1",
FieldSpec.DataType.INT)
- .addMetric("column3",
FieldSpec.DataType.INT).addSingleValueDimension("column5",
FieldSpec.DataType.STRING)
- .addSingleValueDimension("column6", FieldSpec.DataType.INT)
- .addSingleValueDimension("column7", FieldSpec.DataType.INT)
- .addSingleValueDimension("column9", FieldSpec.DataType.INT)
- .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
- .addSingleValueDimension("column12",
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
- .addMetric("column18", FieldSpec.DataType.INT)
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
- } else {
- URL resource =
IntermediateSegmentTest.class.getClassLoader().getResource(inputFile);
- assertNotNull(resource);
- String filePath = resource.getFile();
- schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(new
File(filePath));
- }
- return schema;
- }
-
- private static TableConfig createTableConfig(String inputFile) {
- TableConfig tableConfig;
- if (AVRO_DATA_SV.equals(inputFile)) {
- // The segment generation code in SegmentColumnarIndexCreator will throw
- // exception if start and end time in time column are not in acceptable
- // range. For this test, we first need to fix the input avro data
- // to have the time column values in allowed range. Until then, the check
- // is explicitly disabled
- IngestionConfig ingestionConfig = new IngestionConfig();
- ingestionConfig.setSegmentTimeValueCheck(false);
- ingestionConfig.setRowTimeValueCheck(false);
- tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
- .setInvertedIndexColumns(Arrays.asList("column6", "column7",
"column11", "column17", "column18"))
- .setSegmentPartitionConfig(getSegmentPartitionConfig())
- .setIngestionConfig(ingestionConfig).build();
- } else {
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
- }
- return tableConfig;
- }
-
- private static SegmentPartitionConfig getSegmentPartitionConfig() {
- Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new
HashMap<>();
- ColumnPartitionConfig columnOneConfig = new
ColumnPartitionConfig("Murmur", 1);
- columnPartitionConfigMap.put("column7", columnOneConfig);
- ColumnPartitionConfig columnTwoConfig = new
ColumnPartitionConfig("HashCode", 1);
- columnPartitionConfigMap.put("column11", columnTwoConfig);
- return new SegmentPartitionConfig(columnPartitionConfigMap);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]