This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 23ffd67 Handle the virtual column logic in FieldSpec instead of
Schema (#4562)
23ffd67 is described below
commit 23ffd67d871e26b03fba1da16250c061924dbdf0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Aug 25 16:27:32 2019 -0700
Handle the virtual column logic in FieldSpec instead of Schema (#4562)
Virtual column should be tied to FieldSpec, and should be determined by
whether the virtualColumnProvider is configured
E.g. A column starts with '$' but without virtualColumnProvider should not
be considered a virtual column because it cannot be initialized
Also fix a bug in MutableSegmentImpl where virtual columns are added to the
physical dimensions/metrics list
---
.../org/apache/pinot/common/data/FieldSpec.java | 11 +++
.../java/org/apache/pinot/common/data/Schema.java | 17 +---
.../generator/SegmentGeneratorConfig.java | 8 +-
.../immutable/ImmutableSegmentImpl.java | 21 ++--
.../immutable/ImmutableSegmentLoader.java | 6 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 107 ++++++++++-----------
.../creator/impl/SegmentColumnarIndexCreator.java | 5 +-
.../impl/SegmentIndexCreationDriverImpl.java | 20 ++--
.../VirtualColumnProviderFactory.java | 6 +-
.../MutableSegmentImplAggregateMetricsTest.java | 28 ++++--
.../query/comparison/SegmentInfoProvider.java | 43 +++++----
11 files changed, 137 insertions(+), 135 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
index beac069..d7e92d6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.data;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import javax.annotation.Nullable;
@@ -153,6 +154,16 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, ConfigNodeLife
_virtualColumnProvider = virtualColumnProvider;
}
+ /**
+ * Returns whether the column is virtual. Virtual columns are constructed
while loading the segment, thus do not exist
+ * in the record, nor should be persisted to the disk.
+ * <p>Identify a column as virtual if the virtual column provider is
configured.
+ */
+ @JsonIgnore
+ public boolean isVirtualColumn() {
+ return _virtualColumnProvider != null && !_virtualColumnProvider.isEmpty();
+ }
+
public Object getDefaultNullValue() {
return _defaultNullValue;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
index f06b14c..90c4bcf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
@@ -275,15 +275,13 @@ public final class Schema {
@JsonIgnore
@Nonnull
public Set<String> getPhysicalColumnNames() {
- Set<String> cols = new HashSet<>();
- cols.addAll(_fieldSpecMap.keySet());
- for (String col : _fieldSpecMap.keySet()) {
- // exclude virtual columns
- if (isVirtualColumn(col)) {
- cols.remove(col);
+ Set<String> physicalColumnNames = new HashSet<>();
+ for (FieldSpec fieldSpec : _fieldSpecMap.values()) {
+ if (!fieldSpec.isVirtualColumn()) {
+ physicalColumnNames.add(fieldSpec.getName());
}
}
- return cols;
+ return physicalColumnNames;
}
@JsonIgnore
@@ -671,9 +669,4 @@ public final class Schema {
result = EqualityUtils.hashCodeOf(result, _dateTimeFieldSpecs);
return result;
}
-
- public boolean isVirtualColumn(String columnName) {
- return columnName.startsWith("$") ||
(getFieldSpecFor(columnName).getVirtualColumnProvider() != null
- && !getFieldSpecFor(columnName).getVirtualColumnProvider().isEmpty());
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index ae6c95d..e840c10 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -667,13 +667,13 @@ public class SegmentGeneratorConfig {
private String getQualifyingFields(FieldType type, boolean
excludeVirtualColumns) {
List<String> fields = new ArrayList<>();
- for (final FieldSpec spec : getSchema().getAllFieldSpecs()) {
- if (excludeVirtualColumns &&
getSchema().isVirtualColumn(spec.getName())) {
+ for (FieldSpec fieldSpec : getSchema().getAllFieldSpecs()) {
+ if (excludeVirtualColumns && fieldSpec.isVirtualColumn()) {
continue;
}
- if (spec.getFieldType() == type) {
- fields.add(spec.getName());
+ if (fieldSpec.getFieldType() == type) {
+ fields.add(fieldSpec.getName());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
index 04c55fd..d040fe9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -19,7 +19,6 @@
package org.apache.pinot.core.indexsegment.immutable;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -100,15 +99,7 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
@Override
public Set<String> getPhysicalColumnNames() {
- HashSet<String> physicalColumnNames = new HashSet<>();
-
- for (String columnName : getColumnNames()) {
- if (!_segmentMetadata.getSchema().isVirtualColumn(columnName)) {
- physicalColumnNames.add(columnName);
- }
- }
-
- return physicalColumnNames;
+ return _segmentMetadata.getSchema().getPhysicalColumnNames();
}
@Override
@@ -164,12 +155,12 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
public GenericRow getRecord(int docId, GenericRow reuse) {
Schema schema = _segmentMetadata.getSchema();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- String column = fieldSpec.getName();
- if (!schema.isVirtualColumn(column)) {
- ColumnIndexContainer indexContainer = _indexContainerMap.get(column);
- reuse.putField(column, IndexSegmentUtils
+ if (!fieldSpec.isVirtualColumn()) {
+ String columnName = fieldSpec.getName();
+ ColumnIndexContainer indexContainer =
_indexContainerMap.get(columnName);
+ reuse.putField(columnName, IndexSegmentUtils
.getValue(docId, fieldSpec, indexContainer.getForwardIndex(),
indexContainer.getDictionary(),
-
_segmentMetadata.getColumnMetadataFor(column).getMaxNumberOfMultiValues()));
+
_segmentMetadata.getColumnMetadataFor(columnName).getMaxNumberOfMultiValues()));
}
}
return reuse;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index e78ae37..d716835 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -130,9 +130,9 @@ public class ImmutableSegmentLoader {
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
// Instantiate virtual columns
- for (String columnName : schema.getColumnNames()) {
- if (schema.isVirtualColumn(columnName)) {
- FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ if (fieldSpec.isVirtualColumn()) {
+ String columnName = fieldSpec.getName();
VirtualColumnProvider provider =
VirtualColumnProviderFactory.buildProvider(fieldSpec.getVirtualColumnProvider());
VirtualColumnContext context = new
VirtualColumnContext(NetUtil.getHostnameOrAddress(), segmentName, columnName,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 8591219..0c7bd66 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -26,13 +26,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.common.segment.SegmentMetadata;
@@ -99,9 +100,11 @@ public class MutableSegmentImpl implements MutableSegment {
private volatile long _minTime = Long.MAX_VALUE;
private volatile long _maxTime = Long.MIN_VALUE;
private final int _numKeyColumns;
- private final Collection<FieldSpec> _physicalColumnFieldSpecs;
- private final Collection<FieldSpec> _physicalDimensionsFieldSpecs;
- private final Collection<FieldSpec> _physicalMetricsFieldSpecs;
+
+ // Cache the physical (non-virtual) field specs
+ private final Collection<FieldSpec> _physicalFieldSpecs;
+ private final Collection<DimensionFieldSpec> _physicalDimensionFieldSpecs;
+ private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
@@ -139,26 +142,27 @@ public class MutableSegmentImpl implements MutableSegment
{
_statsHistory = config.getStatsHistory();
_segmentPartitionConfig = config.getSegmentPartitionConfig();
- List<FieldSpec> physicalColumnFieldSpecs = new
ArrayList<>(_schema.getAllFieldSpecs());
- Iterator<FieldSpec> iterator = physicalColumnFieldSpecs.iterator();
- List<FieldSpec> physicalMetricsFieldSpecs = new
ArrayList<>(_schema.getMetricNames().size());
- List<FieldSpec> physicalDimensionsFieldSpecs = new
ArrayList<>(_schema.getDimensionNames().size());
- while (iterator.hasNext()) {
- FieldSpec fieldSpec = iterator.next();
- if (_schema.isVirtualColumn(fieldSpec.getName())) {
- iterator.remove();
- }
- if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
- physicalMetricsFieldSpecs.add(fieldSpec);
- }
- if (fieldSpec.getFieldType() == FieldSpec.FieldType.DIMENSION) {
- physicalDimensionsFieldSpecs.add(fieldSpec);
+ Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+ List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+ List<DimensionFieldSpec> physicalDimensionFieldSpecs = new
ArrayList<>(_schema.getDimensionNames().size());
+ List<MetricFieldSpec> physicalMetricFieldSpecs = new
ArrayList<>(_schema.getMetricNames().size());
+
+ for (FieldSpec fieldSpec : allFieldSpecs) {
+ if (!fieldSpec.isVirtualColumn()) {
+ physicalFieldSpecs.add(fieldSpec);
+
+ FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
+ if (fieldType == FieldSpec.FieldType.DIMENSION) {
+ physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
+ } else if (fieldType == FieldSpec.FieldType.METRIC) {
+ physicalMetricFieldSpecs.add((MetricFieldSpec) fieldSpec);
+ }
}
}
- _physicalColumnFieldSpecs =
Collections.unmodifiableCollection(physicalColumnFieldSpecs);
- _physicalDimensionsFieldSpecs =
Collections.unmodifiableCollection(physicalDimensionsFieldSpecs);
- _physicalMetricsFieldSpecs =
Collections.unmodifiableCollection(physicalMetricsFieldSpecs);
- _numKeyColumns = _physicalDimensionsFieldSpecs.size() + 1; // Add 1 for
time column
+ _physicalFieldSpecs =
Collections.unmodifiableCollection(physicalFieldSpecs);
+ _physicalDimensionFieldSpecs =
Collections.unmodifiableCollection(physicalDimensionFieldSpecs);
+ _physicalMetricFieldSpecs =
Collections.unmodifiableCollection(physicalMetricFieldSpecs);
+ _numKeyColumns = _physicalDimensionFieldSpecs.size() + 1; // Add 1 for
time column
_logger =
LoggerFactory.getLogger(MutableSegmentImpl.class.getName() + "_" +
_segmentName + "_" + config.getStreamName());
@@ -169,7 +173,7 @@ public class MutableSegmentImpl implements MutableSegment {
int avgNumMultiValues = config.getAvgNumMultiValues();
// Initialize for each column
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
_maxNumValuesMap.put(column, 0);
@@ -277,7 +281,7 @@ public class MutableSegmentImpl implements MutableSegment {
private Map<String, Object> updateDictionary(GenericRow row) {
Map<String, Object> dictIdMap = new HashMap<>();
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
Object value = row.getValue(column);
MutableDictionary dictionary = _dictionaryMap.get(column);
@@ -321,7 +325,7 @@ public class MutableSegmentImpl implements MutableSegment {
private void addForwardIndex(GenericRow row, int docId, Map<String, Object>
dictIdMap) {
// Store dictionary Id(s) for columns with dictionary
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
Object value = row.getValue(column);
if (fieldSpec.isSingleValueField()) {
@@ -363,7 +367,7 @@ public class MutableSegmentImpl implements MutableSegment {
// Update inverted index at last
// NOTE: inverted index have to be updated at last because once it gets
updated, the latest record will become
// queryable
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
RealtimeInvertedIndexReader invertedIndex =
_invertedIndexMap.get(column);
if (invertedIndex != null) {
@@ -380,16 +384,13 @@ public class MutableSegmentImpl implements MutableSegment
{
}
private boolean aggregateMetrics(GenericRow row, int docId) {
- for (FieldSpec metricSpec : _physicalMetricsFieldSpecs) {
- String column = metricSpec.getName();
+ for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
+ String column = metricFieldSpec.getName();
Object value = row.getValue(column);
- Preconditions.checkState(metricSpec.isSingleValueField(), "Multivalued
metrics cannot be updated.");
FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
(FixedByteSingleColumnSingleValueReaderWriter)
_indexReaderWriterMap.get(column);
- Preconditions.checkState(_dictionaryMap.get(column) == null, "Updating
metrics not supported with dictionary.");
- FieldSpec.DataType dataType = metricSpec.getDataType();
- // FIXME: this breaks for multi value metrics.
https://github.com/apache/incubator-pinot/issues/3867
+ FieldSpec.DataType dataType = metricFieldSpec.getDataType();
switch (dataType) {
case INT:
indexReaderWriter.setInt(docId, (Integer) value +
indexReaderWriter.getInt(docId));
@@ -436,7 +437,7 @@ public class MutableSegmentImpl implements MutableSegment {
public Set<String> getPhysicalColumnNames() {
HashSet<String> physicalColumnNames = new HashSet<>();
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
physicalColumnNames.add(fieldSpec.getName());
}
@@ -445,24 +446,21 @@ public class MutableSegmentImpl implements MutableSegment
{
@Override
public ColumnDataSource getDataSource(String columnName) {
- if (!_schema.isVirtualColumn(columnName)) {
- return new ColumnDataSource(_schema.getFieldSpecFor(columnName),
_numDocsIndexed,
- _maxNumValuesMap.get(columnName),
_indexReaderWriterMap.get(columnName), _invertedIndexMap.get(columnName),
- _dictionaryMap.get(columnName), _bloomFilterMap.get(columnName));
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ if (fieldSpec.isVirtualColumn()) {
+ VirtualColumnContext virtualColumnContext =
+ new VirtualColumnContext(NetUtil.getHostnameOrAddress(),
getSegmentName(), columnName, _numDocsIndexed + 1);
+ VirtualColumnProvider virtualColumnProvider =
+
VirtualColumnProviderFactory.buildProvider(_schema.getFieldSpecFor(columnName).getVirtualColumnProvider());
+ return new
ColumnDataSource(virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext),
+ virtualColumnProvider.buildMetadata(virtualColumnContext));
} else {
- return getVirtualDataSource(columnName);
+ return new ColumnDataSource(fieldSpec, _numDocsIndexed,
_maxNumValuesMap.get(columnName),
+ _indexReaderWriterMap.get(columnName),
_invertedIndexMap.get(columnName), _dictionaryMap.get(columnName),
+ _bloomFilterMap.get(columnName));
}
}
- private ColumnDataSource getVirtualDataSource(String column) {
- VirtualColumnContext virtualColumnContext =
- new VirtualColumnContext(NetUtil.getHostnameOrAddress(),
getSegmentName(), column, _numDocsIndexed + 1);
- VirtualColumnProvider provider =
-
VirtualColumnProviderFactory.buildProvider(_schema.getFieldSpecFor(column).getVirtualColumnProvider());
- return new
ColumnDataSource(provider.buildColumnIndexContainer(virtualColumnContext),
- provider.buildMetadata(virtualColumnContext));
- }
-
@Override
public List<StarTreeV2> getStarTrees() {
return null;
@@ -475,10 +473,10 @@ public class MutableSegmentImpl implements MutableSegment
{
* @return Generic row with physical columns of the specified row.
*/
public GenericRow getRecord(int docId, GenericRow reuse) {
- for (FieldSpec fieldSpec : _physicalColumnFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
- reuse.putField(column,
- IndexSegmentUtils.getValue(docId, fieldSpec,
_indexReaderWriterMap.get(column), _dictionaryMap.get(column),
+ reuse.putField(column, IndexSegmentUtils
+ .getValue(docId, fieldSpec, _indexReaderWriterMap.get(column),
_dictionaryMap.get(column),
_maxNumValuesMap.getOrDefault(column, 0)));
}
return reuse;
@@ -604,7 +602,7 @@ public class MutableSegmentImpl implements MutableSegment {
int[] dictIds = new int[_numKeyColumns]; // dimensions + time column.
// FIXME: this for loop breaks for multi value dimensions.
https://github.com/apache/incubator-pinot/issues/3867
- for (FieldSpec fieldSpec : _physicalDimensionsFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalDimensionFieldSpecs) {
dictIds[i++] = (Integer) dictIdMap.get(fieldSpec.getName());
}
@@ -632,7 +630,8 @@ public class MutableSegmentImpl implements MutableSegment {
*
* @return Map from dictionary id array to doc id, null if metrics
aggregation cannot be enabled.
*/
- private IdMap<FixedIntArray>
enableMetricsAggregationIfPossible(RealtimeSegmentConfig config, Set<String>
noDictionaryColumns) {
+ private IdMap<FixedIntArray>
enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,
+ Set<String> noDictionaryColumns) {
_aggregateMetrics = config.aggregateMetrics();
if (!_aggregateMetrics) {
_logger.info("Metrics aggregation is disabled.");
@@ -641,7 +640,7 @@ public class MutableSegmentImpl implements MutableSegment {
// All metric columns should have no-dictionary index.
// All metric columns must be single value
- for (FieldSpec fieldSpec : _physicalMetricsFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalMetricFieldSpecs) {
String metric = fieldSpec.getName();
if (!noDictionaryColumns.contains(metric)) {
_logger
@@ -660,7 +659,7 @@ public class MutableSegmentImpl implements MutableSegment {
// All dimension columns should be dictionary encoded.
// All dimension columns must be single value
- for (FieldSpec fieldSpec : _physicalDimensionsFieldSpecs) {
+ for (FieldSpec fieldSpec : _physicalDimensionFieldSpecs) {
String dimension = fieldSpec.getName();
if (noDictionaryColumns.contains(dimension)) {
_logger
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index c497c61..1cc8775 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -118,13 +118,12 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
// Initialize creators for dictionary, forward index and inverted index
for (FieldSpec fieldSpec : fieldSpecs) {
- String columnName = fieldSpec.getName();
-
// Ignore virtual columns
- if (schema.isVirtualColumn(columnName)) {
+ if (fieldSpec.isVirtualColumn()) {
continue;
}
+ String columnName = fieldSpec.getName();
ColumnIndexCreationInfo indexCreationInfo =
indexCreationInfoMap.get(columnName);
Preconditions.checkNotNull(indexCreationInfo, "Missing index creation
info for column: %s", columnName);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 7d4cd9b..95a37cc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -475,21 +475,19 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
*/
void buildIndexCreationInfo()
throws Exception {
- for (FieldSpec spec : dataSchema.getAllFieldSpecs()) {
- String column = spec.getName();
-
- // Skip adding virtual columns, so that they don't get an on-disk
representation
- if (dataSchema.isVirtualColumn(column)) {
+ for (FieldSpec fieldSpec : dataSchema.getAllFieldSpecs()) {
+ // Ignore virtual columns
+ if (fieldSpec.isVirtualColumn()) {
continue;
}
- ColumnStatistics columnProfile =
segmentStats.getColumnProfileFor(column);
+ String columnName = fieldSpec.getName();
+ ColumnStatistics columnProfile =
segmentStats.getColumnProfileFor(columnName);
Set<String> varLengthDictionaryColumns = new
HashSet<>(config.getVarLengthDictionaryColumns());
- indexCreationInfoMap.put(column,
- new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/,
- varLengthDictionaryColumns.contains(column),
ForwardIndexType.FIXED_BIT_COMPRESSED,
- InvertedIndexType.ROARING_BITMAPS, false/*isAutoGenerated*/,
- dataSchema.getFieldSpecFor(column).getDefaultNullValue()));
+ indexCreationInfoMap.put(columnName, new
ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/,
+ varLengthDictionaryColumns.contains(columnName),
ForwardIndexType.FIXED_BIT_COMPRESSED,
+ InvertedIndexType.ROARING_BITMAPS, false/*isAutoGenerated*/,
+ dataSchema.getFieldSpecFor(columnName).getDefaultNullValue()));
}
segmentIndexCreationInfo.setTotalDocs(totalDocs);
segmentIndexCreationInfo.setTotalRawDocs(totalRawDocs);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnProviderFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnProviderFactory.java
index 963ae89..8f58599 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnProviderFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnProviderFactory.java
@@ -29,11 +29,9 @@ import org.apache.pinot.common.data.Schema;
public class VirtualColumnProviderFactory {
public static VirtualColumnProvider buildProvider(String
virtualColumnProvider) {
try {
- Class<? extends VirtualColumnProvider> providerClass =
- (Class<? extends VirtualColumnProvider>)
Class.forName(virtualColumnProvider);
- return providerClass.newInstance();
+ return (VirtualColumnProvider)
Class.forName(virtualColumnProvider).newInstance();
} catch (ReflectiveOperationException e) {
- return null;
+ throw new IllegalStateException("Caught exception while creating
instance of: " + virtualColumnProvider, e);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 7bd81d6..121681c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -26,7 +26,9 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
@@ -51,15 +53,21 @@ public class MutableSegmentImplAggregateMetricsTest {
public void setUp() {
Schema schema = new Schema.SchemaBuilder().setSchemaName("testSchema")
.addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
- .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING)
- .addMetric(METRIC, FieldSpec.DataType.LONG)
- .addMetric(METRIC_2, FieldSpec.DataType.FLOAT)
- .addTime(TIME_COLUMN, TimeUnit.DAYS, FieldSpec.DataType.INT)
- .build();
+ .addSingleValueDimension(DIMENSION_2,
FieldSpec.DataType.STRING).addMetric(METRIC, FieldSpec.DataType.LONG)
+ .addMetric(METRIC_2, FieldSpec.DataType.FLOAT).addTime(TIME_COLUMN,
TimeUnit.DAYS, FieldSpec.DataType.INT)
+ .build();
+ // Add virtual columns, which should not be aggregated
+ DimensionFieldSpec virtualDimensionFieldSpec =
+ new DimensionFieldSpec("$virtualDimension", FieldSpec.DataType.INT,
true, Object.class);
+ schema.addField(virtualDimensionFieldSpec);
+ MetricFieldSpec virtualMetricFieldSpec = new
MetricFieldSpec("$virtualMetric", FieldSpec.DataType.INT);
+ virtualMetricFieldSpec.setVirtualColumnProvider("provider.class");
+ schema.addField(virtualMetricFieldSpec);
+
_mutableSegmentImpl = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(schema, new
HashSet<>(Arrays.asList(DIMENSION_1, METRIC, METRIC_2)),
- new HashSet<>(Arrays.asList(DIMENSION_1)),
- Collections.singleton(DIMENSION_1), true);
+ .createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(METRIC,
METRIC_2)),
+ Collections.singleton(DIMENSION_2), new
HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN)),
+ true);
}
@Test
@@ -111,8 +119,8 @@ public class MutableSegmentImplAggregateMetricsTest {
}
private String buildKey(GenericRow row) {
- return String.valueOf(row.getValue(DIMENSION_1)) + KEY_SEPARATOR +
- row.getValue(DIMENSION_2) + KEY_SEPARATOR + row.getValue(TIME_COLUMN);
+ return row.getValue(DIMENSION_1) + KEY_SEPARATOR +
row.getValue(DIMENSION_2) + KEY_SEPARATOR + row
+ .getValue(TIME_COLUMN);
}
@AfterClass
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
index 4718fa4..9946d98 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -104,26 +104,31 @@ public class SegmentInfoProvider {
}
IndexSegment indexSegment = ImmutableSegmentLoader.load(segmentDir,
ReadMode.heap);
- Schema schema = indexSegment.getSegmentMetadata().getSchema();
- // Add time column if exists.
- String timeColumn = schema.getTimeColumnName();
- if (timeColumn != null) {
- uniqueSingleValueDimensions.add(timeColumn);
- loadValuesForSingleValueDimension(indexSegment,
singleValueDimensionValuesMap, timeColumn);
- }
-
- // Add all metric columns.
- uniqueMetrics.addAll(schema.getMetricNames());
-
- // Add all single-value dimension columns.
- for (DimensionFieldSpec fieldSpec : schema.getDimensionFieldSpecs()) {
- if (!fieldSpec.isSingleValueField() ||
schema.isVirtualColumn(fieldSpec.getName())) {
- continue;
+ try {
+ Schema schema = indexSegment.getSegmentMetadata().getSchema();
+
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ // Ignore virtual columns and multi-value columns
+ if (fieldSpec.isVirtualColumn() || !fieldSpec.isSingleValueField()) {
+ continue;
+ }
+
+ String columnName = fieldSpec.getName();
+ FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
+ switch (fieldType) {
+ // Treat TIME column as single-value dimension column
+ case DIMENSION:
+ case TIME:
+ uniqueSingleValueDimensions.add(columnName);
+ loadValuesForSingleValueDimension(indexSegment,
singleValueDimensionValuesMap, columnName);
+ break;
+ case METRIC:
+ uniqueMetrics.add(columnName);
+ }
}
- String column = fieldSpec.getName();
- uniqueSingleValueDimensions.add(column);
- loadValuesForSingleValueDimension(indexSegment,
singleValueDimensionValuesMap, column);
+ } finally {
+ indexSegment.destroy();
}
if (tmpDir != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]