This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ecba1cc262 Support for Virtual DataSource (#15350)
ecba1cc262 is described below
commit ecba1cc2621bad58e59b3ff1b1c2f2fbc79c5b53
Author: Praveen <[email protected]>
AuthorDate: Fri Jun 13 15:37:51 2025 -0700
Support for Virtual DataSource (#15350)
---
.../query/pruner/BloomFilterSegmentPruner.java | 48 +++---
.../query/pruner/ColumnValueSegmentPruner.java | 30 ++--
.../core/query/pruner/SegmentPrunerService.java | 19 +--
.../query/pruner/SelectionQuerySegmentPruner.java | 3 +-
.../core/query/pruner/ValueBasedSegmentPruner.java | 19 ++-
.../query/pruner/BloomFilterSegmentPrunerTest.java | 4 +-
.../query/pruner/ColumnValueSegmentPrunerTest.java | 9 +-
.../query/pruner/SegmentPrunerServiceTest.java | 20 ---
.../pruner/SelectionQuerySegmentPrunerTest.java | 12 +-
.../tests/OfflineClusterIntegrationTest.java | 168 +++++++++++++++++++++
.../integration/tests/custom/MapTypeTest.java | 4 +-
.../IndexSegmentUtils.java} | 33 ++--
.../indexsegment/immutable/EmptyIndexSegment.java | 17 ++-
.../immutable/ImmutableSegmentImpl.java | 16 ++
.../indexsegment/mutable/MutableSegmentImpl.java | 20 ++-
.../DefaultNullValueVirtualColumnProvider.java | 12 +-
.../segment/index/datasource/EmptyDataSource.java | 9 +-
.../virtualcolumn/DocIdVirtualColumnProvider.java | 23 +--
.../virtualcolumn/VirtualColumnProvider.java | 14 +-
.../org/apache/pinot/segment/spi/IndexSegment.java | 9 +-
20 files changed, 349 insertions(+), 140 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
index 6277aac8eb..8fa24b2740 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java
@@ -49,7 +49,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException;
* The {@code BloomFilterSegmentPruner} prunes segments based on bloom filter
for EQUALITY filter. Because the access
* to bloom filter data is required, segment pruning is done in parallel when
the number of segments is large.
*/
-@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner {
// Try to schedule 10 segments for each thread, or evenly distribute them to
all MAX_NUM_THREADS_PER_QUERY threads.
// TODO: make this threshold configurable? threshold 10 is also used in
CombinePlanNode, which accesses the
@@ -77,6 +76,7 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
if (predicateType == Predicate.Type.IN) {
List<String> values = ((InPredicate) predicate).getValues();
// Skip pruning when there are too many values in the IN predicate
+ //noinspection RedundantIfStatement
if (values.size() <= _inPredicateThreshold) {
return true;
}
@@ -101,7 +101,7 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
for (int i = 0; i < numSegments; i++) {
dataSourceCache.clear();
IndexSegment segment = segments.get(i);
- if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter,
dataSourceCache, cachedValues)) {
+ if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter,
dataSourceCache, cachedValues, query)) {
selectedSegments.add(segment);
}
}
@@ -131,20 +131,20 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
fetchContexts -> pruneInParallel(numTasks, segments, query,
executorService, fetchContexts));
}
- private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment>
segments, QueryContext queryContext,
- ExecutorService executorService, FetchContext[] fetchContexts) {
+ private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment>
segments, QueryContext query,
+ ExecutorService executorService, @Nullable FetchContext[] fetchContexts)
{
int numSegments = segments.size();
List<IndexSegment> allSelectedSegments = new ArrayList<>();
QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> {
- FilterContext filter = Objects.requireNonNull(queryContext.getFilter());
+ FilterContext filter = Objects.requireNonNull(query.getFilter());
ValueCache cachedValues = new ValueCache();
Map<String, DataSource> dataSourceCache = new HashMap<>();
List<IndexSegment> selectedSegments = new ArrayList<>();
for (int i = index; i < numSegments; i += numTasks) {
dataSourceCache.clear();
IndexSegment segment = segments.get(i);
- FetchContext fetchContext = fetchContexts == null ? null :
fetchContexts[i];
- if (!pruneSegmentWithFetchContext(segment, fetchContext, filter,
dataSourceCache, cachedValues)) {
+ FetchContext fetchContext = fetchContexts != null ? fetchContexts[i] :
null;
+ if (!pruneSegmentWithFetchContext(segment, fetchContext, filter,
dataSourceCache, cachedValues, query)) {
selectedSegments.add(segment);
}
}
@@ -158,7 +158,7 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
throw new QueryCancelledException("Cancelled while running
BloomFilterSegmentPruner", e);
}
throw new RuntimeException("Caught exception while running
BloomFilterSegmentPruner", e);
- }, executorService, queryContext.getEndTimeMs());
+ }, executorService, query.getEndTimeMs());
return allSelectedSegments;
}
@@ -188,14 +188,14 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
}
}
- private boolean pruneSegmentWithFetchContext(IndexSegment segment,
FetchContext fetchContext, FilterContext filter,
- Map<String, DataSource> dataSourceCache, ValueCache cachedValues) {
+ private boolean pruneSegmentWithFetchContext(IndexSegment segment, @Nullable
FetchContext fetchContext,
+ FilterContext filter, Map<String, DataSource> dataSourceCache,
ValueCache cachedValues, QueryContext query) {
if (fetchContext == null) {
- return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+ return pruneSegment(segment, filter, dataSourceCache, cachedValues,
query);
}
+ segment.acquire(fetchContext);
try {
- segment.acquire(fetchContext);
- return pruneSegment(segment, filter, dataSourceCache, cachedValues);
+ return pruneSegment(segment, filter, dataSourceCache, cachedValues,
query);
} finally {
segment.release(fetchContext);
}
@@ -203,7 +203,7 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
@Override
boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate,
Map<String, DataSource> dataSourceCache,
- ValueCache cachedValues) {
+ ValueCache cachedValues, QueryContext query) {
Predicate.Type predicateType = predicate.getType();
if (predicateType == Predicate.Type.EQ) {
return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues);
@@ -220,10 +220,12 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
private boolean pruneEqPredicate(IndexSegment segment, EqPredicate
eqPredicate,
Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
String column = eqPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
- assert dataSource != null;
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSourceNullable(column)
+ : dataSourceCache.computeIfAbsent(column,
segment::getDataSourceNullable);
+ if (dataSource == null) {
+ // Column does not exist, cannot prune
+ return false;
+ }
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate,
dataSourceMetadata.getDataType());
// Check bloom filter
@@ -243,10 +245,12 @@ public class BloomFilterSegmentPruner extends
ValueBasedSegmentPruner {
return false;
}
String column = inPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
- assert dataSource != null;
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSourceNullable(column)
+ : dataSourceCache.computeIfAbsent(column,
segment::getDataSourceNullable);
+ if (dataSource == null) {
+ // Column does not exist, cannot prune
+ return false;
+ }
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate,
dataSourceMetadata.getDataType());
// Check bloom filter
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 8eead9f552..7b5a4b507b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -26,6 +26,7 @@ import
org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -78,14 +79,14 @@ public class ColumnValueSegmentPruner extends
ValueBasedSegmentPruner {
@Override
boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate,
Map<String, DataSource> dataSourceCache,
- ValueCache cachedValues) {
+ ValueCache cachedValues, QueryContext query) {
Predicate.Type predicateType = predicate.getType();
if (predicateType == Predicate.Type.EQ) {
- return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues);
+ return pruneEqPredicate(segment, (EqPredicate) predicate,
dataSourceCache, cachedValues, query);
} else if (predicateType == Predicate.Type.IN) {
- return pruneInPredicate(segment, (InPredicate) predicate,
dataSourceCache, cachedValues);
+ return pruneInPredicate(segment, (InPredicate) predicate,
dataSourceCache, cachedValues, query);
} else if (predicateType == Predicate.Type.RANGE) {
- return pruneRangePredicate(segment, (RangePredicate) predicate,
dataSourceCache);
+ return pruneRangePredicate(segment, (RangePredicate) predicate,
dataSourceCache, query);
} else {
return false;
}
@@ -99,11 +100,10 @@ public class ColumnValueSegmentPruner extends
ValueBasedSegmentPruner {
* </ul>
*/
private boolean pruneEqPredicate(IndexSegment segment, EqPredicate
eqPredicate,
- Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+ Map<String, DataSource> dataSourceCache, ValueCache valueCache,
QueryContext query) {
String column = eqPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column, query.getSchema())
+ : dataSourceCache.computeIfAbsent(column, col ->
segment.getDataSource(column, query.getSchema()));
assert dataSource != null;
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate,
dataSourceMetadata.getDataType());
@@ -131,16 +131,15 @@ public class ColumnValueSegmentPruner extends
ValueBasedSegmentPruner {
* <p>NOTE: segments will not be pruned if the number of values is greater
than the threshold.
*/
private boolean pruneInPredicate(IndexSegment segment, InPredicate
inPredicate,
- Map<String, DataSource> dataSourceCache, ValueCache valueCache) {
+ Map<String, DataSource> dataSourceCache, ValueCache valueCache,
QueryContext query) {
List<String> values = inPredicate.getValues();
// Skip pruning when there are too many values in the IN predicate
if (values.size() > _inPredicateThreshold) {
return false;
}
String column = inPredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column, query.getSchema())
+ : dataSourceCache.computeIfAbsent(column, col ->
segment.getDataSource(column, query.getSchema()));
assert dataSource != null;
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate,
dataSourceMetadata.getDataType());
@@ -160,11 +159,10 @@ public class ColumnValueSegmentPruner extends
ValueBasedSegmentPruner {
* </ul>
*/
private boolean pruneRangePredicate(IndexSegment segment, RangePredicate
rangePredicate,
- Map<String, DataSource> dataSourceCache) {
+ Map<String, DataSource> dataSourceCache, QueryContext query) {
String column = rangePredicate.getLhs().getIdentifier();
- DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column)
- : dataSourceCache.computeIfAbsent(column, segment::getDataSource);
- // NOTE: Column must exist after DataSchemaSegmentPruner
+ DataSource dataSource = segment instanceof ImmutableSegment ?
segment.getDataSource(column, query.getSchema())
+ : dataSourceCache.computeIfAbsent(column, col ->
segment.getDataSource(column, query.getSchema()));
assert dataSource != null;
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index 4b775f67bd..a426cb7abe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -105,7 +105,7 @@ public class SegmentPrunerService {
public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query, SegmentPrunerStatistics stats,
@Nullable ExecutorService executorService) {
try (InvocationScope scope =
Tracing.getTracer().createScope(SegmentPrunerService.class)) {
- segments = removeInvalidSegments(segments, query, stats);
+ segments = removeEmptySegments(segments);
int invokedPrunersCount = 0;
for (SegmentPruner segmentPruner : _segmentPruners) {
if (segmentPruner.isApplicableTo(query)) {
@@ -124,7 +124,7 @@ public class SegmentPrunerService {
}
/**
- * Filters the given list, returning a list that only contains the valid
segments, modifying the list received as
+ * Filters the given list, returning a list that only contains the non-empty
segments, modifying the list received as
* argument.
*
* <p>
@@ -136,28 +136,17 @@ public class SegmentPrunerService {
* undefined way. Therefore, this list should not be used
after calling this method.
* @return the new list with filtered elements. This is the list that have
to be used.
*/
- private static List<IndexSegment> removeInvalidSegments(List<IndexSegment>
segments, QueryContext query,
- SegmentPrunerStatistics stats) {
+ private static List<IndexSegment> removeEmptySegments(List<IndexSegment>
segments) {
int selected = 0;
- int invalid = 0;
for (IndexSegment segment : segments) {
if (!isEmptySegment(segment)) {
- if (isInvalidSegment(segment, query)) {
- invalid++;
- } else {
- segments.set(selected++, segment);
- }
+ segments.set(selected++, segment);
}
}
- stats.setInvalidSegments(invalid);
return segments.subList(0, selected);
}
private static boolean isEmptySegment(IndexSegment segment) {
return segment.getSegmentMetadata().getTotalDocs() == 0;
}
-
- private static boolean isInvalidSegment(IndexSegment segment, QueryContext
query) {
- return !segment.getColumnNames().containsAll(query.getColumns());
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 119f128fbb..6f2e29a224 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -126,7 +126,8 @@ public class SelectionQuerySegmentPruner implements
SegmentPruner {
List<MinMaxValue> minMaxValues = new ArrayList<>(numSegments);
for (int i = 0; i < numSegments; i++) {
IndexSegment segment = segments.get(i);
- DataSourceMetadata dataSourceMetadata =
segment.getDataSource(firstOrderByColumn).getDataSourceMetadata();
+ DataSourceMetadata dataSourceMetadata =
+ segment.getDataSource(firstOrderByColumn,
query.getSchema()).getDataSourceMetadata();
Comparable minValue = dataSourceMetadata.getMinValue();
Comparable maxValue = dataSourceMetadata.getMaxValue();
// Always keep the segment if it does not have column min/max value in
the metadata
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
index 2e3214c8c2..14dd4e4764 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
@@ -43,7 +43,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Server;
/**
* The {@code ValueBasedSegmentPruner} prunes segments based on values inside
the filter and segment metadata and data.
*/
-@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"})
+@SuppressWarnings({"rawtypes", "unchecked"})
abstract public class ValueBasedSegmentPruner implements SegmentPruner {
public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold";
protected int _inPredicateThreshold;
@@ -72,6 +72,7 @@ abstract public class ValueBasedSegmentPruner implements
SegmentPruner {
private boolean isApplicableToFilter(FilterContext filter) {
switch (filter.getType()) {
case AND:
+ assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
if (isApplicableToFilter(child)) {
return true;
@@ -79,6 +80,7 @@ abstract public class ValueBasedSegmentPruner implements
SegmentPruner {
}
return false;
case OR:
+ assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
if (!isApplicableToFilter(child)) {
return false;
@@ -108,7 +110,7 @@ abstract public class ValueBasedSegmentPruner implements
SegmentPruner {
List<IndexSegment> selectedSegments = new ArrayList<>(segments.size());
for (IndexSegment segment : segments) {
dataSourceCache.clear();
- if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) {
+ if (!pruneSegment(segment, filter, dataSourceCache, cachedValues,
query)) {
selectedSegments.add(segment);
}
}
@@ -116,18 +118,20 @@ abstract public class ValueBasedSegmentPruner implements
SegmentPruner {
}
protected boolean pruneSegment(IndexSegment segment, FilterContext filter,
Map<String, DataSource> dataSourceCache,
- ValueCache cachedValues) {
+ ValueCache cachedValues, QueryContext query) {
switch (filter.getType()) {
case AND:
+ assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
- if (pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+ if (pruneSegment(segment, child, dataSourceCache, cachedValues,
query)) {
return true;
}
}
return false;
case OR:
+ assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
- if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) {
+ if (!pruneSegment(segment, child, dataSourceCache, cachedValues,
query)) {
return false;
}
}
@@ -137,18 +141,19 @@ abstract public class ValueBasedSegmentPruner implements
SegmentPruner {
return false;
case PREDICATE:
Predicate predicate = filter.getPredicate();
+ assert predicate != null;
// Only prune columns
if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER)
{
return false;
}
- return pruneSegmentWithPredicate(segment, predicate, dataSourceCache,
cachedValues);
+ return pruneSegmentWithPredicate(segment, predicate, dataSourceCache,
cachedValues, query);
default:
throw new IllegalStateException();
}
}
abstract boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate
predicate,
- Map<String, DataSource> dataSourceCache, ValueCache cachedValues);
+ Map<String, DataSource> dataSourceCache, ValueCache cachedValues,
QueryContext query);
protected static Comparable convertValue(String stringValue, DataType
dataType) {
try {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
index 75b2beea0f..93b6552676 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java
@@ -101,7 +101,7 @@ public class BloomFilterSegmentPrunerTest {
throws IOException {
IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0",
"3.0", "5.0", "7.0", "21.0"});
DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+ when(indexSegment.getDataSourceNullable("column")).thenReturn(dataSource);
runPruner(Collections.singletonList(indexSegment),
"SELECT COUNT(*) FROM testTable WHERE column = 5.0 OR column = 0.0",
1);
}
@@ -169,7 +169,7 @@ public class BloomFilterSegmentPrunerTest {
when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+ when(indexSegment.getDataSourceNullable("column")).thenReturn(dataSource);
// Add support for bloom filter
DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
BloomFilterReaderBuilder builder = new BloomFilterReaderBuilder();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
index 5a5d7bd505..ffb74cec8c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -31,10 +31,13 @@ import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
@@ -58,8 +61,7 @@ public class ColumnValueSegmentPrunerTest {
IndexSegment indexSegment = mockIndexSegment();
DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource("column")).thenReturn(dataSource);
-
+ when(indexSegment.getDataSource(eq("column"),
any(Schema.class))).thenReturn(dataSource);
DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
when(dataSourceMetadata.getDataType()).thenReturn(DataType.INT);
when(dataSourceMetadata.getMinValue()).thenReturn(10);
@@ -115,7 +117,7 @@ public class ColumnValueSegmentPrunerTest {
IndexSegment indexSegment = mockIndexSegment();
DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource("column")).thenReturn(dataSource);
+ when(indexSegment.getDataSource(eq("column"),
any(Schema.class))).thenReturn(dataSource);
DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
when(dataSourceMetadata.getDataType()).thenReturn(DataType.INT);
@@ -190,6 +192,7 @@ public class ColumnValueSegmentPrunerTest {
private boolean runPruner(IndexSegment indexSegment, String query) {
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
+ queryContext.setSchema(mock(Schema.class));
return PRUNER.prune(Arrays.asList(indexSegment), queryContext).isEmpty();
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
index 62da031fd4..c18c670c57 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java
@@ -87,26 +87,6 @@ public class SegmentPrunerServiceTest {
Assert.assertEquals(stats.getInvalidSegments(), 0);
}
- @Test
- public void segmentsWithoutColumnAreInvalid() {
- SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf);
- IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2");
-
- SegmentPrunerStatistics stats = new SegmentPrunerStatistics();
-
- List<IndexSegment> indexes = new ArrayList<>();
- indexes.add(indexSegment);
-
- String query = "select not_present from t1";
-
- QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
-
- List<IndexSegment> actual = service.prune(indexes, queryContext, stats);
-
- Assert.assertEquals(actual, Collections.emptyList());
- Assert.assertEquals(1, stats.getInvalidSegments());
- }
-
private IndexSegment mockIndexSegment(int totalDocs, String... columns) {
IndexSegment indexSegment = mock(IndexSegment.class);
when(indexSegment.getColumnNames()).thenReturn(new
HashSet<>(Arrays.asList(columns)));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index 63afba81f9..2f65ef11af 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -29,8 +29,11 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.data.Schema;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -110,9 +113,11 @@ public class SelectionQuerySegmentPrunerTest {
getIndexSegment(5L, 10L, 10), // 6
getIndexSegment(15L, 30L, 15)); // 7
+ Schema schema = mock(Schema.class);
// Should keep segments: [null, null], [-5, 5], [0, 10]
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable
ORDER BY testColumn LIMIT 5");
+ queryContext.setSchema(schema);
List<IndexSegment> result = _segmentPruner.prune(indexSegments,
queryContext);
assertEquals(result.size(), 3);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -122,6 +127,7 @@ public class SelectionQuerySegmentPrunerTest {
// Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15]
queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable
ORDER BY testColumn LIMIT 15, 20");
+ queryContext.setSchema(schema);
result = _segmentPruner.prune(indexSegments, queryContext);
assertEquals(result.size(), 5);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -134,6 +140,7 @@ public class SelectionQuerySegmentPrunerTest {
// Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15],
[15, 30], [15, 50]
queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable
ORDER BY testColumn, foo LIMIT 40");
+ queryContext.setSchema(schema);
result = _segmentPruner.prune(indexSegments, queryContext);
assertEquals(result.size(), 7);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -148,6 +155,7 @@ public class SelectionQuerySegmentPrunerTest {
// Should keep segments: [null, null], [20, 30], [15, 50], [15, 30]
queryContext =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable
ORDER BY testColumn DESC LIMIT 5");
+ queryContext.setSchema(schema);
result = _segmentPruner.prune(indexSegments, queryContext);
assertEquals(result.size(), 4);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -159,6 +167,7 @@ public class SelectionQuerySegmentPrunerTest {
// Should keep segments: [null, null], [20, 30], [15, 50], [15, 30]
queryContext = QueryContextConverterUtils
.getQueryContext("SELECT * FROM testTable ORDER BY testColumn DESC
LIMIT 5, 30");
+ queryContext.setSchema(schema);
result = _segmentPruner.prune(indexSegments, queryContext);
assertEquals(result.size(), 4);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -170,6 +179,7 @@ public class SelectionQuerySegmentPrunerTest {
// Should keep segments: [null, null], [20, 30], [15, 50], [15, 30], [5,
15], [5, 10], [0, 10], [-5, 5]
queryContext = QueryContextConverterUtils
.getQueryContext("SELECT * FROM testTable ORDER BY testColumn DESC,
foo LIMIT 60");
+ queryContext.setSchema(schema);
result = _segmentPruner.prune(indexSegments, queryContext);
assertEquals(result.size(), 8);
assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5
@@ -210,7 +220,7 @@ public class SelectionQuerySegmentPrunerTest {
IndexSegment indexSegment = mock(IndexSegment.class);
when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("foo",
"testColumn"));
DataSource dataSource = mock(DataSource.class);
- when(indexSegment.getDataSource(ORDER_BY_COLUMN)).thenReturn(dataSource);
+ when(indexSegment.getDataSource(eq(ORDER_BY_COLUMN),
any(Schema.class))).thenReturn(dataSource);
DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 1c87c1eee3..7ce615a3b1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -41,6 +42,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.Header;
@@ -56,6 +58,7 @@ import org.apache.pinot.client.PinotDriver;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
@@ -76,6 +79,7 @@ import
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -4086,4 +4090,168 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
fail("Segment reload failed with exception: " + e.getMessage());
}
}
+
+ @Test
+ public void testVirtualColumnWithPartialReload() throws Exception {
+ Schema oldSchema = getSchema(DEFAULT_SCHEMA_NAME);
+ // Pick any existing INT column name for the “valid” cases
+ String validColumnName = oldSchema.getAllFieldSpecs().stream()
+ .filter(fs -> fs.getDataType() == DataType.INT)
+ .findFirst()
+ .get()
+ .getName();
+ // New column name that is not in the schema
+ String newColumn = "newColumn";
+ DataType newdataType = DataType.INT;
+ // Test queries
+ List<String> queries = List.of(
+ SELECT_STAR_QUERY,
+ SELECT_STAR_QUERY + " WHERE " + validColumnName + " > 0 LIMIT 10000",
+ SELECT_STAR_QUERY + " ORDER BY " + validColumnName + " LIMIT 10000",
+ SELECT_STAR_QUERY + " ORDER BY " + newColumn + " LIMIT 10000",
+ "SELECT " + newColumn + " FROM " + DEFAULT_TABLE_NAME
+ );
+ for (String query: queries) {
+ try {
+ // Build new schema with the extra column
+ Schema newSchema = new Schema();
+ newSchema.setSchemaName(oldSchema.getSchemaName());
+ for (FieldSpec fs : oldSchema.getAllFieldSpecs()) {
+ newSchema.addField(fs);
+ }
+ FieldSpec newFieldSpec = new DimensionFieldSpec(newColumn,
newdataType, true);
+ newSchema.addField(newFieldSpec);
+ updateSchema(newSchema);
+
+ // Partially reload one segment
+ reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE",
+ listSegments(DEFAULT_TABLE_NAME + "_OFFLINE").get(0));
+ // Column should show since it would be added as a virtual column
+ runQueryAndAssert(query, newColumn, newFieldSpec);
+ // Now do a full reload and the column should still be there,
indicating there is no regression
+ reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null);
+ runQueryAndAssert(query, newColumn, newFieldSpec);
+ } finally {
+ // Reset back to the original schema for the next iteration
+ forceUpdateSchema(oldSchema);
+ reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null);
+ }
+ }
+ }
+
+ @Test
+ public void testVirtualColumnAfterReloadForDifferentDataTypes() throws
Exception {
+ Schema oldSchema = getSchema(DEFAULT_SCHEMA_NAME);
+ try {
+ // Build a new schema: copy everything, then add one virtual column per
DataType.
+ Schema newSchema = new Schema();
+ oldSchema.getAllFieldSpecs().forEach(newSchema::addField);
+ // Keep insertion order – helps when debugging.
+ Map<String, FieldSpec> newCols = new LinkedHashMap<>();
+ List<DataType> newDataTypes = List.of(
+ DataType.INT, DataType.LONG, DataType.FLOAT, DataType.DOUBLE,
+ DataType.STRING, DataType.BOOLEAN, DataType.BYTES);
+ for (DataType dt : newDataTypes) {
+ String col = "col_" + dt.name().toLowerCase();
+ FieldSpec fs = new DimensionFieldSpec(col, dt, true);
+ newCols.put(col, fs);
+ newSchema.addField(fs);
+ }
+ newSchema.setSchemaName(oldSchema.getSchemaName());
+ updateSchema(newSchema);
+
+ // Reload segment.
+ reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE",
listSegments(DEFAULT_TABLE_NAME + "_OFFLINE").get(0));
+
+ JsonNode res = postQuery("SELECT * FROM " + DEFAULT_TABLE_NAME + " LIMIT
5000");
+ assertNoError(res);
+ JsonNode rows = res.get("resultTable").get("rows");
+ DataSchema resultSchema =
+ JsonUtils.jsonNodeToObject(res.get("resultTable").get("dataSchema"),
DataSchema.class);
+
+ // Verify each new column.
+ for (Map.Entry<String, FieldSpec> e : newCols.entrySet()) {
+ String col = e.getKey();
+ FieldSpec fs = e.getValue();
+
+ int idx = IntStream.range(0, resultSchema.size())
+ .filter(i -> resultSchema.getColumnName(i).equals(col))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Column " + col + "
missing"));
+
+ Assert.assertEquals(resultSchema.getColumnDataType(idx).name(),
fs.getDataType().name(),
+ "Mismatch in reported type for " + col);
+
+ for (JsonNode row : rows) {
+ String expectedDefault;
+ if (fs.getDataType() == DataType.BOOLEAN) {
+ // Pinot surfaces boolean default nulls as literal "false"
+ expectedDefault = "false";
+ } else {
+ expectedDefault = fs.getDefaultNullValueString();
+ }
+ Assert.assertEquals(row.get(idx).asText(), expectedDefault,
"Unexpected default value for " + col);
+ }
+ }
+ } finally {
+ // Clean up the schema to the original state
+ forceUpdateSchema(oldSchema);
+ reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null);
+ }
+ }
+
+ private void reloadAndWait(String tableNameWithType, @Nullable String
segmentName) throws Exception {
+ String response = (segmentName != null)
+ ? reloadOfflineSegment(tableNameWithType, segmentName, true)
+ : reloadOfflineTable(tableNameWithType, true);
+ JsonNode responseJson = JsonUtils.stringToJsonNode(response);
+ String jobId;
+ if (segmentName != null) {
+ // Single segment reload response: status is a string, parse manually
+ String statusString = responseJson.get("status").asText();
+ assertTrue(statusString.contains("SUCCESS"), "Segment reload failed: " +
statusString);
+ int startIdx = statusString.indexOf("reload job id:") + "reload job
id:".length();
+ int endIdx = statusString.indexOf(',', startIdx);
+ jobId = statusString.substring(startIdx, endIdx).trim();
+ } else {
+ // Full table reload response: structured JSON
+ JsonNode tableLevelDetails
+ =
JsonUtils.stringToJsonNode(responseJson.get("status").asText()).get(tableNameWithType);
+
Assert.assertEquals(tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText(),
"SUCCESS");
+ jobId = tableLevelDetails.get("reloadJobId").asText();
+ }
+ String finalJobId = jobId;
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return isReloadJobCompleted(finalJobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Reload job did not complete in 10 minutes");
+ }
+
+ private void runQueryAndAssert(String query, String newAddedColumn,
FieldSpec fieldSpec) throws Exception {
+ JsonNode response = postQuery(query);
+ assertNoError(response);
+ JsonNode rows = response.get("resultTable").get("rows");
+ JsonNode jsonSchema = response.get("resultTable").get("dataSchema");
+ DataSchema resultSchema = JsonUtils.jsonNodeToObject(jsonSchema,
DataSchema.class);
+
+ assert !rows.isEmpty();
+ boolean columnPresent = false;
+ String[] columnNames = resultSchema.getColumnNames();
+ for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++)
{
+ if (columnNames[columnIndex].equals(newAddedColumn)) {
+ columnPresent = true;
+ // Check the data type of the new column
+
Assert.assertEquals(resultSchema.getColumnDataType(columnIndex).name(),
fieldSpec.getDataType().name());
+ // Check the value of the new column
+ for (int rowIndex = 0; rowIndex < rows.size(); rowIndex++) {
+ Assert.assertEquals(rows.get(rowIndex).get(columnIndex).asText(),
+ String.valueOf(fieldSpec.getDefaultNullValue()));
+ }
+ }
+ }
+ Assert.assertTrue(columnPresent, "Column " + newAddedColumn + " not
present in result set");
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
index 578ee5e5ea..fdf88db410 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java
@@ -384,13 +384,13 @@ public class MapTypeTest extends
CustomDataQueryClusterIntegrationTest {
// Select non-existing key with proper filter
query = "SELECT jsonExtractScalar(intKeyMapStr, '$.123', 'INT') FROM " +
getTableName()
- + " WHERE jsonExtractKey(intKeyMapStr, '$.*') = \"$['123']\"";
+ + " WHERE jsonExtractKey(intKeyMapStr, '$.*') = '$[''123'']'";
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
assertEquals(rows.size(), 0);
query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k3', 'INT') FROM " +
getTableName()
- + " WHERE jsonExtractKey(stringKeyMapStr, '$.*') = \"$['k3']\"";
+ + " WHERE jsonExtractKey(stringKeyMapStr, '$.*') = '$[''k3'']'";
pinotResponse = postQuery(query);
assertEquals(pinotResponse.get("exceptions").size(), 0);
rows = pinotResponse.get("resultTable").get("rows");
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java
similarity index 54%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java
index 8f6528fb78..d9146fe720 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java
@@ -16,28 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.local.segment.index.column;
+package org.apache.pinot.segment.local.indexsegment;
+import
org.apache.pinot.segment.local.segment.index.column.DefaultNullValueVirtualColumnProvider;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
-import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnIndexContainer;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
-import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
-import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.spi.data.FieldSpec;
-/**
- * Shared implementation code between column providers.
- */
-public abstract class BaseVirtualColumnProvider implements
VirtualColumnProvider {
-
- protected ColumnMetadataImpl.Builder
getColumnMetadataBuilder(VirtualColumnContext context) {
- return new
ColumnMetadataImpl.Builder().setFieldSpec(context.getFieldSpec())
- .setTotalDocs(context.getTotalDocCount());
+public class IndexSegmentUtils {
+ private IndexSegmentUtils() {
}
- @Override
- public ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext
context) {
- return new VirtualColumnIndexContainer(buildForwardIndex(context),
buildInvertedIndex(context),
- buildDictionary(context));
+ /// Returns a virtual [DataSource] per the given [VirtualColumnContext].
+ public static DataSource createVirtualDataSource(VirtualColumnContext
context) {
+ FieldSpec fieldSpec = context.getFieldSpec();
+ VirtualColumnProvider virtualColumnProvider;
+ if (fieldSpec.getVirtualColumnProvider() != null) {
+ virtualColumnProvider =
VirtualColumnProviderFactory.buildProvider(context);
+ } else {
+ virtualColumnProvider = new DefaultNullValueVirtualColumnProvider();
+ }
+ return virtualColumnProvider.buildDataSource(context);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index b66edb8a0b..5b240e1f22 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.indexsegment.immutable;
+import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
@@ -33,6 +34,8 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -79,7 +82,19 @@ public class EmptyIndexSegment implements ImmutableSegment {
@Override
public DataSource getDataSourceNullable(String column) {
ColumnMetadata columnMetadata =
_segmentMetadata.getColumnMetadataFor(column);
- return columnMetadata != null ? new EmptyDataSource(columnMetadata) : null;
+ return columnMetadata != null ? new
EmptyDataSource(columnMetadata.getFieldSpec()) : null;
+ }
+
+ @Override
+ public DataSource getDataSource(String column, Schema schema) {
+ DataSource dataSource = getDataSourceNullable(column);
+ if (dataSource != null) {
+ return dataSource;
+ }
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in
schema: %s", column,
+ schema.getSchemaName());
+ return new EmptyDataSource(fieldSpec);
}
@Nullable
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 28ea561e8a..d45dc61f3f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -32,11 +32,13 @@ import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
+import org.apache.pinot.segment.local.indexsegment.IndexSegmentUtils;
import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.map.ImmutableMapDataSource;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -57,6 +59,7 @@ import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -234,6 +237,19 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
return _segmentMetadata;
}
+ @Override
+ public DataSource getDataSource(String column, Schema schema) {
+ DataSource dataSource = getDataSourceNullable(column);
+ if (dataSource != null) {
+ return dataSource;
+ }
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in
schema: %s", column,
+ schema.getSchemaName());
+ return IndexSegmentUtils.createVirtualDataSource(
+ new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs()));
+ }
+
@Override
public Set<String> getColumnNames() {
return _segmentMetadata.getSchema().getColumnNames();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index ae1ffae0c3..5bf363228e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -49,6 +49,7 @@ import
org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
import org.apache.pinot.segment.local.dedup.DedupRecordInfo;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
+import org.apache.pinot.segment.local.indexsegment.IndexSegmentUtils;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
@@ -56,14 +57,12 @@ import
org.apache.pinot.segment.local.realtime.impl.dictionary.SameValueMutableD
import
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
import
org.apache.pinot.segment.local.realtime.impl.forward.SameValueMutableForwardIndex;
import
org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector;
-import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import
org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource;
import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
-import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.ComparisonColumns;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
@@ -1059,15 +1058,24 @@ public class MutableSegmentImpl implements
MutableSegment {
FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
if (fieldSpec != null && fieldSpec.isVirtualColumn()) {
// Virtual column
- // TODO: Refactor virtual column provider to directly generate data
source
VirtualColumnContext virtualColumnContext = new
VirtualColumnContext(fieldSpec, _numDocsIndexed);
- VirtualColumnProvider virtualColumnProvider =
VirtualColumnProviderFactory.buildProvider(virtualColumnContext);
- return new
ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext),
-
virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext));
+ return
VirtualColumnProviderFactory.buildProvider(virtualColumnContext).buildDataSource(virtualColumnContext);
}
return null;
}
+ @Override
+ public DataSource getDataSource(String column, Schema schema) {
+ DataSource dataSource = getDataSourceNullable(column);
+ if (dataSource != null) {
+ return dataSource;
+ }
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in
schema: %s", column,
+ schema.getSchemaName());
+ return IndexSegmentUtils.createVirtualDataSource(new
VirtualColumnContext(fieldSpec, _numDocsIndexed));
+ }
+
@Nullable
@Override
public List<StarTreeV2> getStarTrees() {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
index 282043693e..a9d5bc0bcc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java
@@ -30,6 +30,7 @@ import
org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVF
import
org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVInvertedIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.constant.ConstantSortedIndexReader;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
+import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -41,7 +42,7 @@ import org.apache.pinot.spi.utils.ByteArray;
/**
* Provide the default null value.
*/
-public class DefaultNullValueVirtualColumnProvider extends
BaseVirtualColumnProvider {
+public class DefaultNullValueVirtualColumnProvider implements
VirtualColumnProvider {
@Override
public ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context)
{
@@ -86,8 +87,12 @@ public class DefaultNullValueVirtualColumnProvider extends
BaseVirtualColumnProv
@Override
public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
- ColumnMetadataImpl.Builder builder =
getColumnMetadataBuilder(context).setCardinality(1).setHasDictionary(true);
- if (context.getFieldSpec().isSingleValueField()) {
+ FieldSpec fieldSpec = context.getFieldSpec();
+ ColumnMetadataImpl.Builder builder = new
ColumnMetadataImpl.Builder().setFieldSpec(fieldSpec)
+ .setTotalDocs(context.getTotalDocCount())
+ .setCardinality(1)
+ .setHasDictionary(true);
+ if (fieldSpec.isSingleValueField()) {
builder.setSorted(true);
} else {
// When there is no value for a multi-value column, the
maxNumberOfMultiValues and cardinality should be
@@ -96,7 +101,6 @@ public class DefaultNullValueVirtualColumnProvider extends
BaseVirtualColumnProv
builder.setMaxNumberOfMultiValues(1);
}
- FieldSpec fieldSpec = context.getFieldSpec();
Object defaultNullValue = fieldSpec.getDefaultNullValue();
switch (fieldSpec.getDataType().getStoredType()) {
case INT:
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
index 46d8b64046..44c885605f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java
@@ -20,7 +20,6 @@ package
org.apache.pinot.segment.local.segment.index.datasource;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
@@ -32,15 +31,15 @@ import org.apache.pinot.spi.data.FieldSpec;
*/
public class EmptyDataSource extends BaseDataSource {
- public EmptyDataSource(ColumnMetadata columnMetadata) {
- super(new EmptyDataSourceMetadata(columnMetadata),
ColumnIndexContainer.Empty.INSTANCE);
+ public EmptyDataSource(FieldSpec fieldSpec) {
+ super(new EmptyDataSourceMetadata(fieldSpec),
ColumnIndexContainer.Empty.INSTANCE);
}
private static class EmptyDataSourceMetadata implements DataSourceMetadata {
final FieldSpec _fieldSpec;
- EmptyDataSourceMetadata(ColumnMetadata columnMetadata) {
- _fieldSpec = columnMetadata.getFieldSpec();
+ EmptyDataSourceMetadata(FieldSpec fieldSpec) {
+ _fieldSpec = fieldSpec;
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
index a1770ef05f..9cf98952b6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.segment.local.segment.virtualcolumn;
-import java.io.IOException;
-import
org.apache.pinot.segment.local.segment.index.column.BaseVirtualColumnProvider;
import org.apache.pinot.segment.local.segment.index.readers.DocIdDictionary;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -33,7 +31,7 @@ import org.apache.pinot.spi.utils.Pairs;
/**
* Virtual column provider that returns the document id.
*/
-public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider {
+public class DocIdVirtualColumnProvider implements VirtualColumnProvider {
private static final DocIdSortedIndexReader DOC_ID_SORTED_INDEX_READER = new
DocIdSortedIndexReader();
@Override
@@ -47,15 +45,19 @@ public class DocIdVirtualColumnProvider extends
BaseVirtualColumnProvider {
}
@Override
- public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
- ColumnMetadataImpl.Builder columnMetadataBuilder =
super.getColumnMetadataBuilder(context);
-
columnMetadataBuilder.setCardinality(context.getTotalDocCount()).setSorted(true).setHasDictionary(true);
- return columnMetadataBuilder.build();
+ public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext
context) {
+ return DOC_ID_SORTED_INDEX_READER;
}
@Override
- public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext
context) {
- return DOC_ID_SORTED_INDEX_READER;
+ public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) {
+ int numDocs = context.getTotalDocCount();
+ return new
ColumnMetadataImpl.Builder().setFieldSpec(context.getFieldSpec())
+ .setTotalDocs(numDocs)
+ .setCardinality(numDocs)
+ .setSorted(true)
+ .setHasDictionary(true)
+ .build();
}
private static class DocIdSortedIndexReader implements
SortedIndexReader<ForwardIndexReaderContext> {
@@ -76,8 +78,7 @@ public class DocIdVirtualColumnProvider extends
BaseVirtualColumnProvider {
}
@Override
- public void close()
- throws IOException {
+ public void close() {
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
index ee30e08184..043e50057c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.segment.local.segment.virtualcolumn;
+import
org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -30,13 +32,21 @@ import
org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
* comprise a proper column.
*/
public interface VirtualColumnProvider {
+
ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context);
Dictionary buildDictionary(VirtualColumnContext context);
+ InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context);
+
ColumnMetadata buildMetadata(VirtualColumnContext context);
- InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context);
+ default ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext
context) {
+ return new VirtualColumnIndexContainer(buildForwardIndex(context),
buildInvertedIndex(context),
+ buildDictionary(context));
+ }
- ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext context);
+ default DataSource buildDataSource(VirtualColumnContext context) {
+ return new ImmutableDataSource(buildMetadata(context),
buildColumnIndexContainer(context));
+ }
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index 4fdf1d6fab..c045dbd03d 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -62,10 +62,10 @@ public interface IndexSegment {
Set<String> getPhysicalColumnNames();
/// Returns the [DataSource] for the given column.
- /// TODO: Revisit all usage of this method to support virtual [DataSource].
+ /// This api is used when the column is guaranteed to exist in the segment.
default DataSource getDataSource(String column) {
DataSource dataSource = getDataSourceNullable(column);
- Preconditions.checkState(dataSource != null, "Failed to find data source
for column: ", column);
+ Preconditions.checkState(dataSource != null, "Failed to find data source
for column: %s", column);
return dataSource;
}
@@ -76,10 +76,7 @@ public interface IndexSegment {
/// Returns the [DataSource] for the given column, or creates a virtual one
if it doesn't exist. The passed in
/// [Schema] should be the latest schema of the table, not the one from
[SegmentMetadata], and should contain the
/// asked column.
- /// TODO: Add support for virtual [DataSource].
- default DataSource getDataSource(String column, Schema schema) {
- return getDataSource(column);
- }
+ DataSource getDataSource(String column, Schema schema);
/**
* Returns a list of star-trees (V2), or null if there is no star-tree (V2)
in the segment.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]