Jackie-Jiang commented on code in PR #18434:
URL: https://github.com/apache/pinot/pull/18434#discussion_r3199609161
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -73,24 +89,42 @@ public GenericRow decode(byte[] payload, GenericRow
destination) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
ReadableByteChannel channel = Channels.newChannel(inputStream);
Review Comment:
Not applicable — the caller `StreamDataDecoderImpl.decode` asserts non-null
`payload` (`assert value != null`) before invoking the decoder, so a null
payload cannot reach this method.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -73,24 +89,42 @@ public GenericRow decode(byte[] payload, GenericRow
destination) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
ReadableByteChannel channel = Channels.newChannel(inputStream);
ArrowStreamReader reader = new ArrowStreamReader(channel, _allocator))
{
+ if (!reader.loadNextBatch()) {
+ LOGGER.warn("No data found in Arrow message for topic: {}",
_streamTopicName);
+ return null;
+ }
- // Read the Arrow schema and data
VectorSchemaRoot root = reader.getVectorSchemaRoot();
- if (!reader.loadNextBatch()) {
- logger.warn("No data found in Arrow message for topic: {}",
_streamTopicName);
+ int rowCount = root.getRowCount();
+ if (rowCount == 0) {
return null;
}
- // Convert Arrow data to GenericRow using converter
- GenericRow row = _converter.convert(reader, root, destination);
+ _extractor.setReader(reader);
+ if (destination == null) {
+ destination = new GenericRow();
+ }
+ ArrowRecordExtractor.Record record = new ArrowRecordExtractor.Record();
- return row;
+ if (rowCount == 1) {
+ // Single row — fill destination directly (the GenericRow is the row).
+ record.set(0);
+ _extractor.extract(record, destination);
Review Comment:
Not applicable — the caller (`StreamDataDecoderImpl.decode`) calls
`_reuse.clear()` before each invocation, so the destination is empty on entry.
Clearing `MULTIPLE_RECORDS_KEY` here would be redundant.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -18,52 +18,68 @@
*/
package org.apache.pinot.plugin.inputformat.arrow;
-
import java.io.ByteArrayInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into
Pinot GenericRow.
- * This decoder handles Arrow streaming format and converts Arrow data to
Pinot's columnar format.
- */
+
+/// Decodes Apache Arrow IPC stream-format messages into Pinot [GenericRow]s.
The output shape depends on the Arrow
+/// batch's row count:
+/// - 0 row → returns `null` (nothing to ingest).
+/// - 1 row → the single row's fields are populated directly into the
destination [GenericRow].
+/// - multiple rows → the rows are wrapped in a `List<GenericRow>` stored
under [GenericRow#MULTIPLE_RECORDS_KEY]
+/// on the destination.
public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB
default
- private static final Logger logger =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private ArrowRecordExtractor _extractor;
private String _streamTopicName;
private RootAllocator _allocator;
- private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
throws Exception {
+ // Resolve the extractor + config classes from props. Defaults to
`ArrowRecordExtractor` /
+ // `ArrowRecordExtractorConfig`; user-supplied extractors must subclass
[ArrowRecordExtractor]
+ // so the per-batch `setReader` hook is honored.
+ String extractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
+ String configClass = props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
+ if (extractorClass == null) {
+ extractorClass = ArrowRecordExtractor.class.getName();
+ configClass = ArrowRecordExtractorConfig.class.getName();
+ }
+ RecordExtractorConfig extractorConfig = null;
+ if (configClass != null) {
+ extractorConfig = PluginManager.get().createInstance(configClass);
+ extractorConfig.init(props);
+ }
+ _extractor = PluginManager.get().createInstance(extractorClass);
+ _extractor.init(fieldsToRead, extractorConfig);
_streamTopicName = topicName;
// Initialize Arrow allocator with configurable memory limit
- long allocatorLimit =
- Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT,
DEFAULT_ALLOCATOR_LIMIT));
+ long allocatorLimit =
Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT,
DEFAULT_ALLOCATOR_LIMIT));
_allocator = new RootAllocator(allocatorLimit);
- // Initialize Arrow to GenericRow converter (processes all fields)
- _converter = new ArrowToGenericRowConverter();
-
- logger.info(
- "Initialized ArrowMessageDecoder for topic: {} with allocator limit:
{} bytes",
- topicName,
+ LOGGER.info("Initialized ArrowMessageDecoder for topic: {} with allocator
limit: {} bytes", topicName,
allocatorLimit);
Review Comment:
Not applicable — `StreamMessageDecoder.init` is invoked once per decoder
instance by the consuming framework; there is no re-init path in production.
Adding a defensive close would be dead code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]