Copilot commented on code in PR #18434:
URL: https://github.com/apache/pinot/pull/18434#discussion_r3198591889
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -73,24 +89,42 @@ public GenericRow decode(byte[] payload, GenericRow
destination) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
ReadableByteChannel channel = Channels.newChannel(inputStream);
Review Comment:
`decode(byte[], GenericRow)` doesn't guard against `payload == null` (or
empty). A null payload will throw before resources are created, and the catch
block will then throw again when logging `payload.length`, masking the real
error. Add an early return for null/empty payloads and make the error logging
null-safe.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -73,24 +89,42 @@ public GenericRow decode(byte[] payload, GenericRow
destination) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
ReadableByteChannel channel = Channels.newChannel(inputStream);
ArrowStreamReader reader = new ArrowStreamReader(channel, _allocator))
{
+ if (!reader.loadNextBatch()) {
+ LOGGER.warn("No data found in Arrow message for topic: {}",
_streamTopicName);
+ return null;
+ }
- // Read the Arrow schema and data
VectorSchemaRoot root = reader.getVectorSchemaRoot();
- if (!reader.loadNextBatch()) {
- logger.warn("No data found in Arrow message for topic: {}",
_streamTopicName);
+ int rowCount = root.getRowCount();
+ if (rowCount == 0) {
return null;
}
- // Convert Arrow data to GenericRow using converter
- GenericRow row = _converter.convert(reader, root, destination);
+ _extractor.setReader(reader);
+ if (destination == null) {
+ destination = new GenericRow();
+ }
+ ArrowRecordExtractor.Record record = new ArrowRecordExtractor.Record();
- return row;
+ if (rowCount == 1) {
+ // Single row — fill destination directly (the GenericRow is the row).
+ record.set(0);
+ _extractor.extract(record, destination);
Review Comment:
When `rowCount == 1`, the decoder populates the destination directly but
never clears/removes `GenericRow.MULTIPLE_RECORDS_KEY`. If the caller reuses
the same `GenericRow` instance across messages and a previous decode produced
multiple rows, the stale key will cause downstream processing to treat this
message as multi-row. Remove the key (e.g.,
`destination.removeValue(MULTIPLE_RECORDS_KEY)`) on the single-row path.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordExtractor.java:
##########
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.utils.TimestampUtils;
+
+
+/// Extracts a single Arrow row into a [GenericRow]. Reader-scoped state
([VectorSchemaRoot] +
+/// dictionary map) is bound once via [#setReader]; per-row [#extract] calls
take a [Record] holding
+/// only the row index. Dispatch is schema-driven — each column is walked
using its [Field], so the
+/// logical type drives the conversion rather than the runtime Java type of
the value.
+///
+/// **Scalars** (Arrow type → Java output):
+/// - `Bool` → `Boolean`
+/// - `Int(8/16)` → `Integer` (widened from `Byte` / `Short`)
+/// - `Int(32)` → `Integer`
+/// - `Int(64)` → `Long`
+/// - `FloatingPoint(SINGLE)` → `Float`
+/// - `FloatingPoint(DOUBLE)` → `Double`
+/// - `Decimal` → `BigDecimal`
+/// - `Utf8` / `LargeUtf8` → `String` (via `Text.toString()`)
+/// - `Binary` / `LargeBinary` / `FixedSizeBinary` → `byte[]`
+/// - `Null` → `null` (every row is null by definition)
+///
+/// **Temporal** (per the schema's `DateUnit` / `TimeUnit`):
+/// - `Timestamp` no-TZ → [Timestamp] (Arrow surfaces all four units as
`LocalDateTime`; interpreted
+/// as a UTC instant)
+/// - `Timestamp` with-TZ → [Timestamp] (Arrow surfaces all four units as
`Long` epoch; constructed
+/// per the schema's `TimeUnit`, sub-millisecond precision preserved via
[TimestampUtils])
+/// - `Date` → [LocalDate] (`DateDayVector` surfaces as `Integer` raw days;
`DateMilliVector` as
+/// `LocalDateTime` at UTC midnight — both reduce to a calendar date)
+/// - `Time` → [LocalTime] (`TimeSecVector` as `Integer`, `TimeMilliVector` as
`LocalDateTime`,
+/// `TimeMicroVector` / `TimeNanoVector` as `Long` — all collapse onto
nanoseconds-since-midnight)
+/// - `Interval` / `Duration` → ISO-8601 `String` via `value.toString()` —
`java.time.Period` /
+/// `java.time.Duration` / `PeriodDuration` all have meaningful toString
(e.g. `"P1Y2M"`,
+/// `"PT5H30M"`, `"P1Y2M3D PT4H5M6S"`)
+///
+/// With `extractRawTimeValues = true` ([ArrowRecordExtractorConfig]) the
`Date` / `Time` /
+/// `Timestamp` cases bypass the contract conversion: `Date` → `int`
days-since-epoch (regardless of
+/// `DateUnit` — `DateMilli` is always UTC midnight, so reducing to days is
lossless); `Time` /
+/// `Timestamp` → raw `int` / `long` in the schema's `TimeUnit`. `Interval` /
`Duration` are
+/// unaffected. Temporal values that surface inside a `Union` branch don't see
the bypass either —
+/// the chosen branch's [Field] isn't visible from the value alone, so we
can't pick a unit; they
+/// always coerce to `Timestamp` UTC.
+///
+/// **Complex** (recurse with the [Field]'s child fields):
+/// - `List` / `LargeList` / `FixedSizeList` → `Object[]`
+/// - `Struct` → `Map<String, Object>`
+/// - `Map` → `Map<String, Object>` (Arrow's `List<Map<KEY, VALUE>>` entry
list is flattened;
+/// keys are stringified per [BaseRecordExtractor#stringifyMapKey])
+/// - `Union` → recursively dispatched by the value's runtime Java type (the
chosen branch isn't
+/// visible from the value alone — nested complex sub-branches fall back to
`value.toString()`)
+///
+/// **Other**:
+/// - dictionary-encoded vector → decoded against the bound dictionary, then
dispatched on the
+/// decoded vector's [Field] (so the logical type — e.g. `Utf8` — drives
conversion, not the
+/// dictionary's index type)
+///
+/// Unrecognized types (`NONE` / future Arrow additions) throw
[IllegalStateException].
+///
+/// **Quirks worth knowing:**
+/// - `UInt2Vector` (unsigned 16-bit) returns `Character`, not a `Number` —
Arrow's Java bindings
+/// use `char` as the only natively unsigned primitive. We widen to `int`
per the contract.
+/// - `DateDayVector` / `DateMilliVector` return *different* Java types
(`Integer` vs
+/// `LocalDateTime`) for the same logical `DATE` type — historical asymmetry
in Arrow's API.
+public class ArrowRecordExtractor extends
BaseRecordExtractor<ArrowRecordExtractor.Record> {
+
+ /// One Arrow row's worth of state — just the row index. The reader-scoped
state ([VectorSchemaRoot] + [ArrowReader])
+ /// is bound to the extractor itself via [#setReader].
+ public static final class Record {
+ int _rowId;
+
+ public void set(int rowId) {
+ _rowId = rowId;
+ }
+ }
+
+ private boolean _extractRawTimeValues;
+
+ // Reader-scoped state — initialized in [#setReader], read by per-row
[#extract]. The dictionary map
+ // is held so per-row lookups don't re-traverse the reader; the field
vectors are pre-resolved against
+ // the include list so the per-row loop is a flat array walk (names are read
inline from each field
+ // vector — `Field#getName` is a plain getter).
+ private Map<Long, Dictionary> _dictionaries;
+ private FieldVector[] _fieldVectors;
+
+ @Override
+ protected void initConfig(@Nullable RecordExtractorConfig config) {
+ if (config instanceof ArrowRecordExtractorConfig) {
+ _extractRawTimeValues = ((ArrowRecordExtractorConfig)
config).isExtractRawTimeValues();
+ }
+ }
+
+ /// Binds the extractor to `reader` for the upcoming run of [#extract]
calls. Must be called before
+ /// [#extract] — once per file (`ArrowRecordReader`) or per `decode()` call
(`ArrowMessageDecoder`).
+ /// Resolves the include list against the reader's [VectorSchemaRoot] and
stashes the dictionary map.
+ public void setReader(ArrowReader reader)
+ throws IOException {
+ _dictionaries = reader.getDictionaryVectors();
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ if (_extractAll) {
+ _fieldVectors = fieldVectors.toArray(new FieldVector[0]);
+ } else {
+ List<FieldVector> matched = new ArrayList<>(_fields.size());
+ for (FieldVector fieldVector : fieldVectors) {
+ if (_fields.contains(fieldVector.getField().getName())) {
+ matched.add(fieldVector);
+ }
+ }
+ _fieldVectors = matched.toArray(new FieldVector[0]);
+ }
+ }
+
+ @Override
+ public GenericRow extract(Record from, GenericRow to) {
+ for (FieldVector fieldVector : _fieldVectors) {
+ to.putValue(fieldVector.getField().getName(), extractValue(fieldVector,
from._rowId));
+ }
+ return to;
+ }
+
+ /// Reads the raw Arrow value at `rowId` (decoding the dictionary-encoded
vector against the bound
+ /// dictionary map first when applicable), then dispatches by the [Field]'s
logical type. The decoded
+ /// vector's [Field] (not the original encoded one) drives the dispatch so
the logical type — e.g.
+ /// `Utf8` — is used instead of the dictionary's index type.
+ @Nullable
+ private Object extractValue(FieldVector fieldVector, int rowId) {
+ DictionaryEncoding dictionaryEncoding =
fieldVector.getField().getDictionary();
+ if (dictionaryEncoding != null) {
+ try (ValueVector decoded = DictionaryEncoder.decode(fieldVector,
_dictionaries.get(dictionaryEncoding.getId()))) {
+ Object rawValue = decoded.getObject(rowId);
+ return rawValue != null ? convert(decoded.getField(), rawValue) : null;
Review Comment:
For dictionary-encoded columns, `extractValue` calls
`DictionaryEncoder.decode(...)` inside the per-row extraction path and closes
the decoded vector each time. In `ArrowRecordReader`, this decodes the same
dictionary column repeatedly for every row in the batch, which is likely very
expensive. Consider decoding dictionary vectors once per batch in
`setReader(...)` (or caching a per-field decoded vector / lookup) and reusing
it across row extractions.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -18,52 +18,68 @@
*/
package org.apache.pinot.plugin.inputformat.arrow;
-
import java.io.ByteArrayInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into
Pinot GenericRow.
- * This decoder handles Arrow streaming format and converts Arrow data to
Pinot's columnar format.
- */
+
+/// Decodes Apache Arrow IPC stream-format messages into Pinot [GenericRow]s.
The output shape depends on the Arrow
+/// batch's row count:
+/// - 0 row → returns `null` (nothing to ingest).
+/// - 1 row → the single row's fields are populated directly into the
destination [GenericRow].
+/// - multiple rows → the rows are wrapped in a `List<GenericRow>` stored
under [GenericRow#MULTIPLE_RECORDS_KEY]
+/// on the destination.
public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB
default
- private static final Logger logger =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private ArrowRecordExtractor _extractor;
private String _streamTopicName;
private RootAllocator _allocator;
- private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
throws Exception {
+ // Resolve the extractor + config classes from props. Defaults to
`ArrowRecordExtractor` /
+ // `ArrowRecordExtractorConfig`; user-supplied extractors must subclass
[ArrowRecordExtractor]
+ // so the per-batch `setReader` hook is honored.
+ String extractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
+ String configClass = props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
+ if (extractorClass == null) {
+ extractorClass = ArrowRecordExtractor.class.getName();
+ configClass = ArrowRecordExtractorConfig.class.getName();
+ }
+ RecordExtractorConfig extractorConfig = null;
+ if (configClass != null) {
+ extractorConfig = PluginManager.get().createInstance(configClass);
+ extractorConfig.init(props);
+ }
+ _extractor = PluginManager.get().createInstance(extractorClass);
+ _extractor.init(fieldsToRead, extractorConfig);
_streamTopicName = topicName;
// Initialize Arrow allocator with configurable memory limit
- long allocatorLimit =
- Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT,
DEFAULT_ALLOCATOR_LIMIT));
+ long allocatorLimit =
Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT,
DEFAULT_ALLOCATOR_LIMIT));
_allocator = new RootAllocator(allocatorLimit);
- // Initialize Arrow to GenericRow converter (processes all fields)
- _converter = new ArrowToGenericRowConverter();
-
- logger.info(
- "Initialized ArrowMessageDecoder for topic: {} with allocator limit:
{} bytes",
- topicName,
+ LOGGER.info("Initialized ArrowMessageDecoder for topic: {} with allocator
limit: {} bytes", topicName,
allocatorLimit);
Review Comment:
`init(...)` overwrites `_allocator` on every call without closing any
existing allocator first. Because the decoder is re-initializable (see tests),
this can leak direct memory until process exit. Consider closing the previous
allocator (or calling `close()`) before assigning a new `RootAllocator`.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -18,52 +18,68 @@
*/
package org.apache.pinot.plugin.inputformat.arrow;
-
import java.io.ByteArrayInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into
Pinot GenericRow.
- * This decoder handles Arrow streaming format and converts Arrow data to
Pinot's columnar format.
- */
+
+/// Decodes Apache Arrow IPC stream-format messages into Pinot [GenericRow]s.
The output shape depends on the Arrow
+/// batch's row count:
+/// - 0 row → returns `null` (nothing to ingest).
+/// - 1 row → the single row's fields are populated directly into the
destination [GenericRow].
+/// - multiple rows → the rows are wrapped in a `List<GenericRow>` stored
under [GenericRow#MULTIPLE_RECORDS_KEY]
+/// on the destination.
public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB
default
- private static final Logger logger =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private ArrowRecordExtractor _extractor;
private String _streamTopicName;
private RootAllocator _allocator;
- private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
throws Exception {
+ // Resolve the extractor + config classes from props. Defaults to
`ArrowRecordExtractor` /
+ // `ArrowRecordExtractorConfig`; user-supplied extractors must subclass
[ArrowRecordExtractor]
+ // so the per-batch `setReader` hook is honored.
+ String extractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
+ String configClass = props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
+ if (extractorClass == null) {
+ extractorClass = ArrowRecordExtractor.class.getName();
+ configClass = ArrowRecordExtractorConfig.class.getName();
+ }
+ RecordExtractorConfig extractorConfig = null;
+ if (configClass != null) {
+ extractorConfig = PluginManager.get().createInstance(configClass);
+ extractorConfig.init(props);
+ }
+ _extractor = PluginManager.get().createInstance(extractorClass);
Review Comment:
The extractor is instantiated via `PluginManager`, but the code doesn't
validate that the configured class actually extends `ArrowRecordExtractor`. A
misconfiguration will fail with an opaque `ClassCastException` and/or miss the
required `setReader(...)` behavior. Add an explicit `instanceof
ArrowRecordExtractor` check with a clear error message (or fall back to the
default extractor).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]