This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 819168f2e8 [Clean up] Remove IntermediateSegment (#10199)
819168f2e8 is described below

commit 819168f2e87e5b0499a64c19a99db3e63033a6e3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jan 27 21:17:30 2023 -0800

    [Clean up] Remove IntermediateSegment (#10199)
---
 .../indexsegment/mutable/IntermediateSegment.java  | 370 ---------------------
 .../creator/IntermediateSegmentStatsContainer.java |  54 ---
 .../index/column/IntermediateIndexContainer.java   | 134 --------
 .../mutable/IntermediateSegmentTest.java           | 260 ---------------
 4 files changed, 818 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
deleted file mode 100644
index 08bbcba7a2..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.mutable;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
-import 
org.apache.pinot.segment.local.realtime.impl.dictionary.MutableDictionaryFactory;
-import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
-import 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
-import 
org.apache.pinot.segment.local.segment.index.column.IntermediateIndexContainer;
-import org.apache.pinot.segment.local.segment.index.column.NumValuesInfo;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
-import org.apache.pinot.segment.spi.MutableSegment;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
-import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
-import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
-import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.FieldSpec.FieldType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.RowMetadata;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Intermediate segment format to store the collected data so far. This 
segment format will be used to generate the
- * final
- * offline segment in SegmentIndexCreationDriver.
- */
-public class IntermediateSegment implements MutableSegment {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IntermediateSegment.class);
-
-  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
-  private static final int DEFAULT_CAPACITY = 100_000;
-  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
-  private static final int DEFAULT_EST_CARDINALITY = 5000;
-  private static final int DEFAULT_AVG_MULTI_VALUE_COUNT = 2;
-
-  private final SegmentGeneratorConfig _segmentGeneratorConfig;
-  private final Schema _schema;
-  private final TableConfig _tableConfig;
-  private final String _segmentName;
-  private final SegmentMetadata _segmentMetadata;
-  private final Map<String, IntermediateIndexContainer> _indexContainerMap = 
new HashMap<>();
-  private final PinotDataBufferMemoryManager _memoryManager;
-  private final File _mmapDir;
-
-  private final int _capacity = DEFAULT_CAPACITY;
-  private volatile int _numDocsIndexed = 0;
-
-  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
-    _segmentGeneratorConfig = segmentGeneratorConfig;
-    _schema = segmentGeneratorConfig.getSchema();
-    _tableConfig = segmentGeneratorConfig.getTableConfig();
-    _segmentName = _segmentGeneratorConfig.getTableName() + 
System.currentTimeMillis();
-    _segmentMetadata =
-        new 
SegmentMetadataImpl(TableNameBuilder.extractRawTableName(_tableConfig.getTableName()),
 _segmentName,
-            _schema, System.currentTimeMillis()) {
-          @Override
-          public int getTotalDocs() {
-            return _numDocsIndexed;
-          }
-        };
-
-    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
-    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
-    for (FieldSpec fieldSpec : allFieldSpecs) {
-      if (!fieldSpec.isVirtualColumn()) {
-        physicalFieldSpecs.add(fieldSpec);
-      }
-    }
-
-    String outputDir = segmentGeneratorConfig.getOutDir();
-    _mmapDir = new File(outputDir, _segmentName + "_mmap_" + 
UUID.randomUUID());
-    _mmapDir.mkdir();
-    LOGGER.info("Mmap file dir: " + _mmapDir);
-    _memoryManager = new MmapMemoryManager(_mmapDir.toString(), _segmentName, 
null);
-
-    // Initialize for each column
-    for (FieldSpec fieldSpec : physicalFieldSpecs) {
-      String column = fieldSpec.getName();
-
-      // Partition info
-      SegmentPartitionConfig segmentPartitionConfig = 
segmentGeneratorConfig.getSegmentPartitionConfig();
-      PartitionFunction partitionFunction = null;
-      Set<Integer> partitions = null;
-      if (segmentPartitionConfig != null && 
segmentPartitionConfig.getColumnPartitionMap().containsKey(column)) {
-        partitionFunction =
-            
PartitionFunctionFactory.getPartitionFunction(segmentPartitionConfig.getFunctionName(column),
-                segmentPartitionConfig.getNumPartitions(column), 
segmentPartitionConfig.getFunctionConfig(column));
-        partitions = new HashSet<>();
-        partitions.add(segmentGeneratorConfig.getSequenceId());
-      }
-
-      DataType storedType = fieldSpec.getDataType().getStoredType();
-      boolean isFixedWidthColumn = storedType.isFixedWidth();
-      MutableForwardIndex forwardIndex;
-      MutableDictionary dictionary;
-
-      int dictionaryColumnSize;
-      if (isFixedWidthColumn) {
-        dictionaryColumnSize = storedType.size();
-      } else {
-        dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
-      }
-      // NOTE: preserve 10% buffer for cardinality to reduce the chance of 
re-sizing the dictionary
-      int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
-      String dictionaryAllocationContext =
-          buildAllocationContext(_segmentName, column, 
V1Constants.Dict.FILE_EXTENSION);
-      dictionary = MutableDictionaryFactory.getMutableDictionary(storedType, 
true, _memoryManager, dictionaryColumnSize,
-          Math.min(estimatedCardinality, _capacity), 
dictionaryAllocationContext);
-
-      if (fieldSpec.isSingleValueField()) {
-        // Single-value dictionary-encoded forward index
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
-        forwardIndex =
-            new FixedByteSVMutableForwardIndex(true, DataType.INT, _capacity, 
_memoryManager, allocationContext);
-      } else {
-        // Multi-value dictionary-encoded forward index
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
-        // TODO: Start with a smaller capacity on 
FixedByteMVForwardIndexReaderWriter and let it expand
-        forwardIndex =
-            new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, 
DEFAULT_AVG_MULTI_VALUE_COUNT, _capacity,
-                Integer.BYTES, _memoryManager, allocationContext, true, 
DataType.INT);
-      }
-
-      _indexContainerMap.put(column,
-          new IntermediateIndexContainer(fieldSpec, partitionFunction, 
partitions, new NumValuesInfo(), forwardIndex,
-              dictionary));
-    }
-  }
-
-  @Override
-  public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
-      throws IOException {
-    updateDictionary(row);
-    addNewRow(row);
-    _numDocsIndexed++;
-    return true;
-  }
-
-  @Override
-  public int getNumDocsIndexed() {
-    return _numDocsIndexed;
-  }
-
-  @Override
-  public String getSegmentName() {
-    return _segmentName;
-  }
-
-  @Override
-  public SegmentMetadata getSegmentMetadata() {
-    return _segmentMetadata;
-  }
-
-  @Override
-  public Set<String> getColumnNames() {
-    return _schema.getColumnNames();
-  }
-
-  @Override
-  public Set<String> getPhysicalColumnNames() {
-    return _schema.getPhysicalColumnNames();
-  }
-
-  @Override
-  public DataSource getDataSource(String columnName) {
-    return _indexContainerMap.get(columnName).toDataSource(_numDocsIndexed);
-  }
-
-  @Override
-  public List<StarTreeV2> getStarTrees() {
-    return null;
-  }
-
-  @Nullable
-  @Override
-  public ThreadSafeMutableRoaringBitmap getValidDocIds() {
-    return null;
-  }
-
-  @Override
-  public GenericRow getRecord(int docId, GenericRow reuse) {
-    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
-      recordReader.init(this);
-      recordReader.getRecord(docId, reuse);
-      return reuse;
-    } catch (Exception e) {
-      throw new RuntimeException("Caught exception while reading record for 
docId: " + docId, e);
-    }
-  }
-
-  @Override
-  public Object getValue(int docId, String column) {
-    try (PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(this, column)) {
-      return columnReader.getValue(docId);
-    } catch (Exception e) {
-      throw new RuntimeException(
-          String.format("Caught exception while reading value for docId: %d, 
column: %s", docId, column), e);
-    }
-  }
-
-  @Override
-  public void destroy() {
-    String segmentName = getSegmentName();
-    LOGGER.info("Trying to destroy segment : {}", segmentName);
-    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      try {
-        entry.getValue().close();
-      } catch (IOException e) {
-        LOGGER.error("Failed to close indexes for column: {}. Continuing with 
error.", entry.getKey(), e);
-      }
-    }
-    FileUtils.deleteQuietly(_mmapDir);
-  }
-
-  private void updateDictionary(GenericRow row) {
-    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      String column = entry.getKey();
-      IntermediateIndexContainer indexContainer = entry.getValue();
-      Object value = row.getValue(column);
-      MutableDictionary dictionary = indexContainer.getDictionary();
-      if (dictionary != null) {
-        if (indexContainer.getFieldSpec().isSingleValueField()) {
-          indexContainer.setDictId(dictionary.index(value));
-        } else {
-          indexContainer.setDictIds(dictionary.index((Object[]) value));
-        }
-
-        // Update min/max value from dictionary
-        indexContainer.setMinValue(dictionary.getMinVal());
-        indexContainer.setMaxValue(dictionary.getMaxVal());
-      }
-    }
-  }
-
-  private void addNewRow(GenericRow row)
-      throws IOException {
-    int docId = _numDocsIndexed;
-    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      String column = entry.getKey();
-      IntermediateIndexContainer indexContainer = entry.getValue();
-      Object value = row.getValue(column);
-      FieldSpec fieldSpec = indexContainer.getFieldSpec();
-      if (fieldSpec.isSingleValueField()) {
-        // Update numValues info
-        indexContainer.getNumValuesInfo().updateSVEntry();
-
-        // Update indexes
-        MutableForwardIndex forwardIndex = indexContainer.getForwardIndex();
-        int dictId = indexContainer.getDictId();
-        if (dictId >= 0) {
-          // Dictionary-encoded single-value column
-
-          // Update forward index
-          forwardIndex.setDictId(docId, dictId);
-        } else {
-          // Single-value column with raw index
-
-          // Update forward index
-          DataType dataType = fieldSpec.getDataType();
-          switch (dataType.getStoredType()) {
-            case INT:
-              forwardIndex.setInt(docId, (Integer) value);
-              break;
-            case LONG:
-              forwardIndex.setLong(docId, (Long) value);
-              break;
-            case FLOAT:
-              forwardIndex.setFloat(docId, (Float) value);
-              break;
-            case DOUBLE:
-              forwardIndex.setDouble(docId, (Double) value);
-              break;
-            case STRING:
-              forwardIndex.setString(docId, (String) value);
-              break;
-            case BYTES:
-              forwardIndex.setBytes(docId, (byte[]) value);
-              break;
-            default:
-              throw new UnsupportedOperationException(
-                  "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
-          }
-
-          // Update min/max value from raw value
-          // NOTE: Skip updating min/max value for aggregated metrics because 
the value will change over time.
-          if (fieldSpec.getFieldType() != FieldType.METRIC) {
-            Comparable comparable;
-            if (dataType == DataType.BYTES) {
-              comparable = new ByteArray((byte[]) value);
-            } else {
-              comparable = (Comparable) value;
-            }
-            if (indexContainer.getMinValue() == null) {
-              indexContainer.setMinValue(comparable);
-              indexContainer.setMaxValue(comparable);
-            } else {
-              if (comparable.compareTo(indexContainer.getMinValue()) < 0) {
-                indexContainer.setMinValue(comparable);
-              }
-              if (comparable.compareTo(indexContainer.getMaxValue()) > 0) {
-                indexContainer.setMaxValue(comparable);
-              }
-            }
-          }
-        }
-      } else {
-        // Multi-value column (always dictionary-encoded)
-        int[] dictIds = indexContainer.getDictIds();
-
-        // Update numValues info
-        indexContainer.getNumValuesInfo().updateMVEntry(dictIds.length);
-
-        // Update forward index
-        indexContainer.getForwardIndex().setDictIdMV(docId, dictIds);
-      }
-    }
-  }
-
-  private String buildAllocationContext(String segmentName, String columnName, 
String indexType) {
-    return segmentName + ":" + columnName + indexType;
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
deleted file mode 100644
index edec434d66..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/IntermediateSegmentStatsContainer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.creator;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.segment.local.indexsegment.mutable.IntermediateSegment;
-import 
org.apache.pinot.segment.local.realtime.converter.stats.MutableColumnStatistics;
-import org.apache.pinot.segment.spi.creator.ColumnStatistics;
-import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsContainer;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-
-
-public class IntermediateSegmentStatsContainer implements 
SegmentPreIndexStatsContainer {
-  private final IntermediateSegment _intermediateSegment;
-  private final Map<String, ColumnStatistics> _columnStatisticsMap = new 
HashMap<>();
-
-  public IntermediateSegmentStatsContainer(IntermediateSegment 
intermediateSegment) {
-    _intermediateSegment = intermediateSegment;
-
-    // Create all column statistics
-    for (String columnName : intermediateSegment.getPhysicalColumnNames()) {
-      DataSource dataSource = intermediateSegment.getDataSource(columnName);
-      // Always use dictionary for intermediate segment stats
-      _columnStatisticsMap.put(columnName, new 
MutableColumnStatistics(dataSource, null));
-    }
-  }
-
-  @Override
-  public ColumnStatistics getColumnProfileFor(String column) {
-    return _columnStatisticsMap.get(column);
-  }
-
-  @Override
-  public int getTotalDocCount() {
-    return _intermediateSegment.getNumDocsIndexed();
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
deleted file mode 100644
index dd5e8c4ada..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.index.column;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import javax.annotation.Nullable;
-import 
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.mutable.MutableDictionary;
-import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
-import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class IntermediateIndexContainer implements Closeable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IntermediateIndexContainer.class);
-
-  final FieldSpec _fieldSpec;
-  final PartitionFunction _partitionFunction;
-  final Set<Integer> _partitions;
-  final NumValuesInfo _numValuesInfo;
-  final MutableForwardIndex _forwardIndex;
-  final MutableDictionary _dictionary;
-
-  volatile Comparable _minValue;
-  volatile Comparable _maxValue;
-
-  // Hold the dictionary id for the latest record
-  int _dictId = Integer.MIN_VALUE;
-  int[] _dictIds;
-
-  public IntermediateIndexContainer(FieldSpec fieldSpec, @Nullable 
PartitionFunction partitionFunction,
-      @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, 
MutableForwardIndex forwardIndex,
-      MutableDictionary dictionary) {
-    _fieldSpec = fieldSpec;
-    _partitionFunction = partitionFunction;
-    _partitions = partitions;
-    _numValuesInfo = numValuesInfo;
-    _forwardIndex = forwardIndex;
-    _dictionary = dictionary;
-  }
-
-  public DataSource toDataSource(int numDocsIndexed) {
-    return new MutableDataSource(_fieldSpec, numDocsIndexed, 
_numValuesInfo._numValues,
-        _numValuesInfo._maxNumValuesPerMVEntry, _dictionary.length(), 
_partitionFunction, _partitions, _minValue,
-        _maxValue, _forwardIndex, _dictionary, null, null, null, null, null, 
null, null, null);
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    String column = _fieldSpec.getName();
-    try {
-      _forwardIndex.close();
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while closing forward index for column: 
{}, continuing with error", column, e);
-    }
-    if (_dictionary != null) {
-      try {
-        _dictionary.close();
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while closing dictionary for column: 
{}, continuing with error", column, e);
-      }
-    }
-  }
-
-  public FieldSpec getFieldSpec() {
-    return _fieldSpec;
-  }
-
-  public NumValuesInfo getNumValuesInfo() {
-    return _numValuesInfo;
-  }
-
-  public MutableForwardIndex getForwardIndex() {
-    return _forwardIndex;
-  }
-
-  public MutableDictionary getDictionary() {
-    return _dictionary;
-  }
-
-  public int getDictId() {
-    return _dictId;
-  }
-
-  public void setDictId(int dictId) {
-    _dictId = dictId;
-  }
-
-  public int[] getDictIds() {
-    return _dictIds;
-  }
-
-  public void setDictIds(int[] dictIds) {
-    _dictIds = dictIds;
-  }
-
-  public Comparable getMinValue() {
-    return _minValue;
-  }
-
-  public void setMinValue(Comparable minValue) {
-    _minValue = minValue;
-  }
-
-  public Comparable getMaxValue() {
-    return _maxValue;
-  }
-
-  public void setMaxValue(Comparable maxValue) {
-    _maxValue = maxValue;
-  }
-}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
deleted file mode 100644
index c762942626..0000000000
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegmentTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.mutable;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
-import org.apache.pinot.segment.spi.ColumnMetadata;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
-import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-
-public class IntermediateSegmentTest {
-  private static final String AVRO_DATA_SV = "data/test_data-sv.avro";
-  private static final String AVRO_DATA_MV = "data/test_data-mv.avro";
-  private static final String SEGMENT_NAME = "testSegmentName";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"IntermediateSegmentTest");
-
-  @BeforeMethod
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-
-  @AfterMethod
-  public void tearDown() {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-
-  @DataProvider(name = "segmentCreationTestCases")
-  private static Object[][] createSegmentCreationTestCases() {
-    return new Object[][]{{AVRO_DATA_SV}, {AVRO_DATA_MV}};
-  }
-
-  @Test(dataProvider = "segmentCreationTestCases")
-  public void testOfflineSegmentCreationFromDifferentWays(String inputFile)
-      throws Exception {
-    // Get resource file path.
-    URL resource = getClass().getClassLoader().getResource(inputFile);
-    assertNotNull(resource);
-    String filePath = resource.getFile();
-
-    Schema schema = createSchema(inputFile);
-    TableConfig tableConfig = createTableConfig(inputFile);
-
-    // Create the segment generator config.
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
-    segmentGeneratorConfig.setInputFilePath(filePath);
-    segmentGeneratorConfig.setTableName("testTable");
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6", 
"column7"));
-
-    IndexSegment segmentFromIntermediateSegment = 
buildSegmentFromIntermediateSegment(segmentGeneratorConfig);
-    IndexSegment segmentFromAvroRecordReader = 
buildSegmentFromAvroRecordReader(segmentGeneratorConfig);
-
-    assertNotNull(segmentFromIntermediateSegment);
-    assertNotNull(segmentFromAvroRecordReader);
-    assertEquals(segmentFromIntermediateSegment.getColumnNames(), 
segmentFromAvroRecordReader.getColumnNames());
-    Set<String> physicalColumnsFromIntermediateSegment = 
segmentFromIntermediateSegment.getPhysicalColumnNames();
-    Set<String> physicalColumnsFromAvroSegment = 
segmentFromAvroRecordReader.getPhysicalColumnNames();
-    assertEquals(physicalColumnsFromIntermediateSegment, 
physicalColumnsFromAvroSegment);
-
-    // Comparison for every columns
-    for (String column : physicalColumnsFromIntermediateSegment) {
-      DataSource dataSourceFromIntermediateSegment = 
segmentFromIntermediateSegment.getDataSource(column);
-      DataSource dataSourceFromAvroRecordReader = 
segmentFromAvroRecordReader.getDataSource(column);
-
-      // Comparison for dictionaries.
-      Dictionary actualDictionary = 
dataSourceFromIntermediateSegment.getDictionary();
-      Dictionary expectedDictionary = 
dataSourceFromAvroRecordReader.getDictionary();
-      assertEquals(actualDictionary.getMinVal(), 
expectedDictionary.getMinVal());
-      assertEquals(actualDictionary.getMaxVal(), 
expectedDictionary.getMaxVal());
-      assertEquals(actualDictionary.getValueType(), 
expectedDictionary.getValueType());
-      assertEquals(actualDictionary.length(), expectedDictionary.length());
-      int dictionaryLength = actualDictionary.length();
-      for (int i = 0; i < dictionaryLength; i++) {
-        assertEquals(actualDictionary.get(i), expectedDictionary.get(i));
-      }
-
-      // Comparison for inverted index
-      InvertedIndexReader actualInvertedIndexReader = 
dataSourceFromIntermediateSegment.getInvertedIndex();
-      InvertedIndexReader expectedInvertedIndexReader = 
dataSourceFromAvroRecordReader.getInvertedIndex();
-      if (actualInvertedIndexReader != null) {
-        for (int j = 0; j < dictionaryLength; j++) {
-          assertEquals(actualInvertedIndexReader.getDocIds(j), 
expectedInvertedIndexReader.getDocIds(j));
-        }
-      }
-
-      // Check for Partition Metadata.
-      SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-      if (segmentPartitionConfig != null && 
segmentPartitionConfig.getColumnPartitionMap().containsKey(column)) {
-        ColumnMetadata columnMetadata =
-            
segmentFromIntermediateSegment.getSegmentMetadata().getColumnMetadataFor(column);
-        assertNotNull(columnMetadata.getPartitionFunction());
-        assertEquals(columnMetadata.getPartitionFunction().getName(), 
segmentPartitionConfig.getFunctionName(column));
-        assertEquals(columnMetadata.getPartitionFunction().getNumPartitions(),
-            segmentPartitionConfig.getNumPartitions(column));
-        assertEquals(columnMetadata.getPartitionFunction().getFunctionConfig(),
-            segmentPartitionConfig.getFunctionConfig(column));
-        assertNotNull(columnMetadata.getPartitions());
-        assertEquals(columnMetadata.getPartitions().size(), 1);
-      }
-    }
-  }
-
-  private IndexSegment 
buildSegmentFromIntermediateSegment(SegmentGeneratorConfig 
segmentGeneratorConfig)
-      throws Exception {
-    // Set intermediate segment record reader.
-    String segmentName = SEGMENT_NAME + "_from_intermediate_segment";
-    segmentGeneratorConfig.setSegmentName(segmentName);
-    IntermediateSegment intermediateSegment = new 
IntermediateSegment(segmentGeneratorConfig);
-
-    // Ingest data.
-    ingestDataToIntermediateSegment(segmentGeneratorConfig, 
intermediateSegment);
-    PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
-    recordReader.init(intermediateSegment);
-
-    // Build the segment from intermediate segment.
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, recordReader);
-    driver.build();
-
-    // Destroy intermediate segment after the segment creation.
-    recordReader.close();
-    intermediateSegment.destroy();
-
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
-  }
-
-  private IndexSegment buildSegmentFromAvroRecordReader(SegmentGeneratorConfig 
segmentGeneratorConfig)
-      throws Exception {
-    // Use avro record reader by default
-    String segmentName = SEGMENT_NAME + "_from_avro_reader";
-    segmentGeneratorConfig.setSegmentName(segmentName);
-
-    // Build the index segment.
-    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig);
-    driver.build();
-
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
-  }
-
-  private void ingestDataToIntermediateSegment(SegmentGeneratorConfig 
segmentGeneratorConfig,
-      IntermediateSegment intermediateSegment)
-      throws IOException {
-    AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(new File(segmentGeneratorConfig.getInputFilePath()), 
null, null);
-
-    GenericRow genericRow = new GenericRow();
-    while (avroRecordReader.hasNext()) {
-      genericRow.clear();
-      genericRow = avroRecordReader.next(genericRow);
-      intermediateSegment.index(genericRow, null);
-    }
-  }
-
-  private static Schema createSchema(String inputFile)
-      throws IOException {
-    Schema schema;
-    if (AVRO_DATA_SV.equals(inputFile)) {
-      schema = new 
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", 
FieldSpec.DataType.INT)
-          .addMetric("column3", 
FieldSpec.DataType.INT).addSingleValueDimension("column5", 
FieldSpec.DataType.STRING)
-          .addSingleValueDimension("column6", FieldSpec.DataType.INT)
-          .addSingleValueDimension("column7", FieldSpec.DataType.INT)
-          .addSingleValueDimension("column9", FieldSpec.DataType.INT)
-          .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
-          .addSingleValueDimension("column12", 
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
-          .addMetric("column18", FieldSpec.DataType.INT)
-          .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
-    } else {
-      URL resource = 
IntermediateSegmentTest.class.getClassLoader().getResource(inputFile);
-      assertNotNull(resource);
-      String filePath = resource.getFile();
-      schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(new 
File(filePath));
-    }
-    return schema;
-  }
-
-  private static TableConfig createTableConfig(String inputFile) {
-    TableConfig tableConfig;
-    if (AVRO_DATA_SV.equals(inputFile)) {
-      // The segment generation code in SegmentColumnarIndexCreator will throw
-      // exception if start and end time in time column are not in acceptable
-      // range. For this test, we first need to fix the input avro data
-      // to have the time column values in allowed range. Until then, the check
-      // is explicitly disabled
-      IngestionConfig ingestionConfig = new IngestionConfig();
-      ingestionConfig.setSegmentTimeValueCheck(false);
-      ingestionConfig.setRowTimeValueCheck(false);
-      tableConfig =
-          new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
-              .setInvertedIndexColumns(Arrays.asList("column6", "column7", 
"column11", "column17", "column18"))
-              .setSegmentPartitionConfig(getSegmentPartitionConfig())
-              .setIngestionConfig(ingestionConfig).build();
-    } else {
-      tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
-    }
-    return tableConfig;
-  }
-
-  private static SegmentPartitionConfig getSegmentPartitionConfig() {
-    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new 
HashMap<>();
-    ColumnPartitionConfig columnOneConfig = new 
ColumnPartitionConfig("Murmur", 1);
-    columnPartitionConfigMap.put("column7", columnOneConfig);
-    ColumnPartitionConfig columnTwoConfig = new 
ColumnPartitionConfig("HashCode", 1);
-    columnPartitionConfigMap.put("column11", columnTwoConfig);
-    return new SegmentPartitionConfig(columnPartitionConfigMap);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to