This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a9f36e438ce Followups on column reader changes (#17293)
a9f36e438ce is described below
commit a9f36e438cec8ab80139acf03993e39947582c66
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Dec 18 09:45:26 2025 +0530
Followups on column reader changes (#17293)
* Followups on column reader changes
* Handle nulls in default columns more appropriately
* Add isSingleValue() to ColumnReader
* Minor refactoring in EpochTimeHandler
* Avoid null changes of DefaultValueColumnReader in this PR
---
.../processing/timehandler/EpochTimeHandler.java | 3 +-
.../segment/readers/DefaultValueColumnReader.java | 13 +++++
.../readers/PinotSegmentColumnReaderFactory.java | 9 +--
.../readers/PinotSegmentColumnReaderImpl.java | 11 ++++
.../readers/PinotSegmentColumnarDataSource.java | 61 ++++++++++++++++++++
.../pinot/spi/data/readers/ColumnReader.java | 22 +++++++-
.../spi/data/readers/ColumnReaderFactory.java | 6 +-
.../pinot/spi/data/readers/ColumnarDataSource.java | 66 ++++++++++++++++++++++
8 files changed, 178 insertions(+), 13 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
index bf4b4ef48df..a3fbd47b601 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
@@ -105,10 +105,9 @@ public class EpochTimeHandler implements TimeHandler {
@Override
@Nullable
public Object getModifiedTimeValue(Object columnValue) {
- long timeMs = _formatSpec.fromFormatToMillis(columnValue.toString());
-
// Round time if needed
if (_roundBucketMs > 0) {
+ long timeMs = _formatSpec.fromFormatToMillis(columnValue.toString());
timeMs = (timeMs / _roundBucketMs) * _roundBucketMs;
return _dataType.convert(_formatSpec.fromMillisToFormat(timeMs));
} else {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
index ee70b620138..6e97958f96d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
@@ -34,6 +34,7 @@ public class DefaultValueColumnReader implements ColumnReader
{
private final String _columnName;
private final int _numDocs;
private final Object _defaultValue;
+ private final FieldSpec _fieldSpec;
private final FieldSpec.DataType _dataType;
// Pre-computed multi-value arrays for reuse
@@ -57,6 +58,7 @@ public class DefaultValueColumnReader implements ColumnReader
{
_columnName = columnName;
_numDocs = numDocs;
_currentIndex = 0;
+ _fieldSpec = fieldSpec;
_dataType = fieldSpec.getDataType();
// For multi-value fields, wrap the default value in an array
@@ -146,6 +148,11 @@ public class DefaultValueColumnReader implements
ColumnReader {
_currentIndex++;
}
+ @Override
+ public boolean isSingleValue() {
+ return _fieldSpec.isSingleValueField();
+ }
+
@Override
public boolean isInt() {
return _dataType == FieldSpec.DataType.INT;
@@ -338,6 +345,12 @@ public class DefaultValueColumnReader implements
ColumnReader {
return (byte[]) _defaultValue;
}
+ @Override
+ public Object getValue(int docId) {
+ validateDocId(docId);
+ return _defaultValue;
+ }
+
// Multi-value accessors
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
index e9d16782fdc..2b40444a52a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
@@ -91,17 +91,12 @@ public class PinotSegmentColumnReaderFactory implements
ColumnReaderFactory {
}
@Override
- public ColumnReader getColumnReader(String columnName)
- throws IOException {
+ public ColumnReader getColumnReader(String columnName) {
if (_targetSchema == null) {
throw new IllegalStateException("Factory not initialized. Call init()
first.");
}
- ColumnReader reader = _columnReaders.get(columnName);
- if (reader == null) {
- throw new IOException("Column reader not found for column: " +
columnName);
- }
- return reader;
+ return _columnReaders.get(columnName);
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
index 827839bdee8..2af71982e56 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
@@ -109,6 +109,11 @@ public class PinotSegmentColumnReaderImpl implements
ColumnReader {
_currentIndex++;
}
+ @Override
+ public boolean isSingleValue() {
+ return _segmentColumnReader.isSingleValue();
+ }
+
@Override
public boolean isInt() {
return _dataType == FieldSpec.DataType.INT;
@@ -314,6 +319,12 @@ public class PinotSegmentColumnReaderImpl implements
ColumnReader {
return _segmentColumnReader.getBytes(docId);
}
+ @Override
+ public Object getValue(int docId)
+ throws IOException {
+ return _segmentColumnReader.getValue(docId);
+ }
+
// Multi-value accessors
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
new file mode 100644
index 00000000000..8b3306f6946
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.ColumnReaderFactory;
+import org.apache.pinot.spi.data.readers.ColumnarDataSource;
+
+
+/**
+ * ColumnarDataSource implementation that wraps a Pinot segment.
+ * Provides columnar access to segment data via ColumnReaderFactory.
+ */
+public class PinotSegmentColumnarDataSource implements ColumnarDataSource {
+
+ private final IndexSegment _indexSegment;
+ private final int _totalDocs;
+
+ public PinotSegmentColumnarDataSource(IndexSegment indexSegment) {
+ _indexSegment = indexSegment;
+ _totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
+ }
+
+ @Override
+ public int getTotalDocs() {
+ return _totalDocs;
+ }
+
+ @Override
+ public ColumnReaderFactory createColumnReaderFactory() {
+ return new PinotSegmentColumnReaderFactory(_indexSegment);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ // Segment lifecycle is managed externally, so no cleanup needed here
+ }
+
+ @Override
+ public String toString() {
+ return "PinotSegmentColumnarDataSource{segment=" +
_indexSegment.getSegmentName() + "}";
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
index faca8bfac07..28bc576acb4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
@@ -188,6 +188,11 @@ public interface ColumnReader extends Closeable,
Serializable {
*/
void skipNext() throws IOException;
+ /**
+ * Check if the column data is single-value or multi-value.
+ */
+ boolean isSingleValue();
+
/**
* Check if the column data type from the actual reader can be returned as
the expected type directly.
* For multi-value columns, this indicates if the multi-value type specific
methods can be called directly.
@@ -262,7 +267,7 @@ public interface ColumnReader extends Closeable,
Serializable {
/**
* Get int / long / float / double / string / byte[] value at the given
document ID for single-value columns.
* Should be called only if isNull(docId) returns false.
- * <p>Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()}
- 1.
+ * Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()} - 1.
*
* @param docId Document ID (0-based)
* @throws IndexOutOfBoundsException If docId is out of range
@@ -275,6 +280,21 @@ public interface ColumnReader extends Closeable,
Serializable {
String getString(int docId) throws IOException;
byte[] getBytes(int docId) throws IOException;
+ /**
+ * Get the value at the given document ID as a Java Object.
+ * Can be used for both single-value and multi-value columns.
+ * This should be used if
+ * 1. Certain API's don't yet support primitive type specific methods (eg:
TimeHandler, Partitioner, etc.) and
+ * thus will be boxed anyway.
+ * 2. The required data type does not match the actual type and the client
will handle the conversion
+ * Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()} - 1.
+ *
+ * @param docId Document ID (0-based)
+ * @throws IndexOutOfBoundsException If docId is out of range
+ * @throws IOException If an I/O error occurs while reading
+ */
+ Object getValue(int docId) throws IOException;
+
// Multi-value accessors
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
index eadb1ad6564..e31293c0b96 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.data.Schema;
@@ -70,11 +71,10 @@ public interface ColumnReaderFactory extends Closeable,
Serializable {
* Implementations may cache and reuse readers for efficiency.
*
* @param columnName Name of the column to read
+ * Can return null if column doesn't exist in the source
* @return ColumnReader instance for the specified column (may be cached)
- * @throws IOException If the column reader cannot be obtained
*/
- ColumnReader getColumnReader(String columnName)
- throws IOException;
+ @Nullable ColumnReader getColumnReader(String columnName);
/**
* Get all column readers for the target schema.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
new file mode 100644
index 00000000000..a0d1da64cc3
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * <p>
+ * This interface is designed to support the creation of multiple {@link
ColumnReaderFactory} instances
+ * from the same underlying data source. This capability is required because
different processing contexts
+ * may require distinct column reader objects for the same input file and
column. For example:
+ * <ul>
+ * <li>Different processing stages may need independent iterators over the
same column</li>
+ * <li>Parallel processing threads may each require their own column
readers</li>
+ * </ul>
+ * <p>
+ */
+public interface ColumnarDataSource extends Closeable {
+
+ /**
+ * Returns the total number of documents (rows) in this data source.
+ *
+ * @return the total document count
+ */
+ int getTotalDocs();
+
+ /**
+ * Creates a new {@link ColumnReaderFactory} instance for this data source.
+ * <p>
+ * Multiple factories can be created from the same data source, allowing
different consumers
+ * to independently read and process the columnar data. Each factory can
then create its own
+ * set of column readers without interfering with readers created by other
factories.
+ *
+ * @return a new column reader factory instance
+ * @throws IOException if an I/O error occurs while creating the factory
+ */
+ ColumnReaderFactory createColumnReaderFactory()
+ throws IOException;
+
+ /**
+ * Returns a string description of the underlying data source.
+ * <p>
+ * This typically includes information such as the file name, path, or other
identifiers
+ * that help identify the source of the data.
+ *
+ * @return a description of the underlying source (e.g., file name)
+ */
+ String toString();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]