This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e7f3a63a04a Add Apache Arrow format decoder to Pinot (#17031)
e7f3a63a04a is described below
commit e7f3a63a04ab0ecdcc5e617c25c5f4b55c822f66
Author: lnbest0707 <[email protected]>
AuthorDate: Tue Oct 21 09:52:09 2025 -0700
Add Apache Arrow format decoder to Pinot (#17031)
* Add Apache Arrow format decoder to Pinot
* Resolve comments
---
.../pinot-input-format/{ => pinot-arrow}/pom.xml | 62 +-
.../inputformat/arrow/ArrowMessageDecoder.java | 114 +++
.../arrow/ArrowToGenericRowConverter.java | 226 ++++++
.../inputformat/arrow/ArrowMessageDecoderTest.java | 762 +++++++++++++++++++++
.../inputformat/arrow/util/ArrowTestDataUtil.java | 607 ++++++++++++++++
pinot-plugins/pinot-input-format/pom.xml | 1 +
6 files changed, 1747 insertions(+), 25 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pom.xml
b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
similarity index 53%
copy from pinot-plugins/pinot-input-format/pom.xml
copy to pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
index e9aabe0079f..746c69b49b8 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
@@ -22,41 +22,53 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>pinot-plugins</artifactId>
+ <artifactId>pinot-input-format</artifactId>
<groupId>org.apache.pinot</groupId>
<version>1.5.0-SNAPSHOT</version>
</parent>
- <artifactId>pinot-input-format</artifactId>
- <packaging>pom</packaging>
- <name>Pinot Input Format</name>
+ <artifactId>pinot-arrow</artifactId>
+ <name>Pinot Arrow</name>
<url>https://pinot.apache.org/</url>
<properties>
- <pinot.root>${basedir}/../..</pinot.root>
- <plugin.type>pinot-input-format</plugin.type>
+ <pinot.root>${basedir}/../../..</pinot.root>
+ <shade.phase.prop>package</shade.phase.prop>
</properties>
- <modules>
- <module>pinot-avro</module>
- <module>pinot-avro-base</module>
- <module>pinot-clp-log</module>
- <module>pinot-confluent-avro</module>
- <module>pinot-confluent-json</module>
- <module>pinot-confluent-protobuf</module>
- <module>pinot-orc</module>
- <module>pinot-json</module>
- <module>pinot-parquet</module>
- <module>pinot-csv</module>
- <module>pinot-thrift</module>
- <module>pinot-protobuf</module>
- </modules>
-
<dependencies>
<dependency>
- <groupId>org.apache.pinot</groupId>
- <artifactId>pinot-spi</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-compression</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-format</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>pinot-fastdev</id>
+ <properties>
+ <shade.phase.prop>none</shade.phase.prop>
+ </properties>
+ </profile>
+ </profiles>
</project>
diff --git
a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
new file mode 100644
index 00000000000..297e0463829
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+
+import java.io.ByteArrayInputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into
Pinot GenericRow.
+ * This decoder handles Arrow streaming format and converts Arrow data to
Pinot's columnar format.
+ */
+public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
+ public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
+ public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB
default
+
+ private static final Logger logger =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+
+ private String _streamTopicName;
+ private RootAllocator _allocator;
+ private ArrowToGenericRowConverter _converter;
+
+ @Override
+ public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
+ throws Exception {
+ _streamTopicName = topicName;
+
+ // Initialize Arrow allocator with configurable memory limit
+ long allocatorLimit =
+ Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT,
DEFAULT_ALLOCATOR_LIMIT));
+ _allocator = new RootAllocator(allocatorLimit);
+
+ // Initialize Arrow to GenericRow converter (processes all fields)
+ _converter = new ArrowToGenericRowConverter();
+
+ logger.info(
+ "Initialized ArrowMessageDecoder for topic: {} with allocator limit:
{} bytes",
+ topicName,
+ allocatorLimit);
+ }
+
+ @Nullable
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
+ ReadableByteChannel channel = Channels.newChannel(inputStream);
+ ArrowStreamReader reader = new ArrowStreamReader(channel, _allocator))
{
+
+ // Read the Arrow schema and data
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ if (!reader.loadNextBatch()) {
+ logger.warn("No data found in Arrow message for topic: {}",
_streamTopicName);
+ return null;
+ }
+
+ // Convert Arrow data to GenericRow using converter
+ GenericRow row = _converter.convert(reader, root, destination);
+
+ return row;
+ } catch (Exception e) {
+ logger.error(
+ "Error decoding Arrow message for stream topic {} : {}",
+ _streamTopicName,
+ Arrays.toString(payload),
+ e);
+ return null;
+ }
+ }
+
+ @Nullable
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
+ return decode(Arrays.copyOfRange(payload, offset, offset + length),
destination);
+ }
+
+ /** Clean up resources */
+ public void close() {
+ if (_allocator != null) {
+ try {
+ _allocator.close();
+ } catch (Exception e) {
+ logger.warn("Error closing Arrow allocator", e);
+ }
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
new file mode 100644
index 00000000000..2cdafc49478
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.util.Text;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for converting Apache Arrow VectorSchemaRoot to Pinot {@code
GenericRow}. Processes
+ * all fields and handles multiple rows from Arrow batch.
+ */
+public class ArrowToGenericRowConverter {
+ private static final Logger logger =
LoggerFactory.getLogger(ArrowToGenericRowConverter.class);
+
+ /** Default constructor that processes all fields from Arrow batch. */
+ public ArrowToGenericRowConverter() {
+ logger.debug("ArrowToGenericRowConverter created for processing all
fields");
+ }
+
+ /**
+ * Converts an Arrow VectorSchemaRoot to a Pinot {@code GenericRow}.
Processes ALL rows from the
+ * Arrow batch and stores them as a list using MULTIPLE_RECORDS_KEY.
+ *
+ * @param reader ArrowStreamReader containing the data
+ * @param root Arrow VectorSchemaRoot containing the data
+ * @param destination Optional destination {@code GenericRow}, will create
new if null
+ * @return {@code GenericRow} containing {@code List<GenericRow>} with all
converted rows, or null
+ * if no data available
+ */
+ @Nullable
+ public GenericRow convert(
+ ArrowStreamReader reader, VectorSchemaRoot root, GenericRow destination)
{
+ if (root == null) {
+ logger.warn("Cannot convert null VectorSchemaRoot");
+ return null;
+ }
+
+ if (destination == null) {
+ destination = new GenericRow();
+ }
+
+ int rowCount = root.getRowCount();
+ if (rowCount == 0) {
+ logger.warn("No rows found in Arrow data");
+ return destination;
+ }
+
+ List<GenericRow> rows = new ArrayList<>(rowCount);
+
+ // Process all rows from the Arrow batch
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ GenericRow row = convertSingleRow(reader, root, rowIndex);
+ if (row != null) {
+ rows.add(row);
+ }
+ }
+
+ if (!rows.isEmpty()) {
+ // Use Pinot's MULTIPLE_RECORDS_KEY to store the list of rows
+ destination.putValue(GenericRow.MULTIPLE_RECORDS_KEY, rows);
+ logger.debug("Converted {} rows from Arrow batch", rows.size());
+ }
+
+ return destination;
+ }
+
+ /**
+ * Converts a single row from Arrow VectorSchemaRoot.
+ *
+ * @param reader ArrowStreamReader containing the data
+ * @param root Arrow VectorSchemaRoot containing the data
+ * @param rowIndex Index of the row to convert (0-based)
+ * @return {@code GenericRow} with converted data, or null if row index is
invalid
+ */
+ @Nullable
+ private GenericRow convertSingleRow(
+ ArrowStreamReader reader, VectorSchemaRoot root, int rowIndex) {
+ GenericRow row = new GenericRow();
+ int convertedFields = 0;
+
+ // Process all fields in the Arrow schema
+ for (int i = 0; i < root.getFieldVectors().size(); i++) {
+ Object value;
+
+ FieldVector fieldVector = root.getFieldVectors().get(i);
+ String fieldName = fieldVector.getField().getName();
+ try {
+ if (fieldVector.getField().getDictionary() != null) {
+ long dictionaryId = fieldVector.getField().getDictionary().getId();
+ try (ValueVector realFieldVector =
+ DictionaryEncoder.decode(
+ fieldVector,
reader.getDictionaryVectors().get(dictionaryId))) {
+ value = realFieldVector.getObject(rowIndex);
+ }
+ } else {
+ value = fieldVector.getObject(rowIndex);
+ }
+ if (value != null) {
+ // Convert Arrow-specific types to Pinot-compatible types
+ Object pinotCompatibleValue =
convertArrowTypeToPinotCompatible(value);
+ row.putValue(fieldName, pinotCompatibleValue);
+ convertedFields++;
+ }
+ } catch (Exception e) {
+ logger.error("Error extracting value for field: {} at row {}",
fieldName, rowIndex, e);
+ }
+ }
+
+ logger.debug("Converted {} fields from Arrow row {} to GenericRow",
convertedFields, rowIndex);
+ return row;
+ }
+
+ /**
+ * Converts Arrow-specific data types to Pinot-compatible types. This method
handles the
+ * incompatibility issues between Arrow's native data types and what Pinot
expects.
+ *
+ * @param value The raw value from Arrow fieldVector.getObject()
+ * @return A Pinot-compatible version of the value
+ */
+ @Nullable
+ private Object convertArrowTypeToPinotCompatible(@Nullable Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ // Handle nested List and Map values, including Arrow MapVector's
representation
+ if (value instanceof List) {
+ List<?> originalList = (List<?>) value;
+ if (!originalList.isEmpty()) {
+ boolean looksLikeMapEntries = true;
+ boolean sawNonNull = false;
+ for (Object entryObj : originalList) {
+ if (entryObj == null) {
+ continue;
+ }
+ sawNonNull = true;
+ if (!(entryObj instanceof Map)) {
+ looksLikeMapEntries = false;
+ break;
+ }
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> entryMap = (Map<Object, Object>) entryObj;
+ if (!entryMap.containsKey(MapVector.KEY_NAME)) {
+ looksLikeMapEntries = false;
+ break;
+ }
+ }
+ if (looksLikeMapEntries && sawNonNull) {
+ Map<String, Object> flattened = new
LinkedHashMap<>(originalList.size());
+ for (Object entryObj : originalList) {
+ if (entryObj == null) {
+ continue;
+ }
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> entryMap = (Map<Object, Object>) entryObj;
+ Object rawKey = entryMap.get(MapVector.KEY_NAME);
+ Object rawVal = entryMap.get(MapVector.VALUE_NAME);
+ Object convertedKey = convertArrowTypeToPinotCompatible(rawKey);
+ Object convertedVal = convertArrowTypeToPinotCompatible(rawVal);
+ flattened.put(String.valueOf(convertedKey), convertedVal);
+ }
+ return flattened;
+ }
+ }
+
+ List<Object> convertedList = new ArrayList<>(originalList.size());
+ for (Object element : originalList) {
+ convertedList.add(convertArrowTypeToPinotCompatible(element));
+ }
+ return convertedList;
+ }
+
+ // Handle Arrow Text type -> String conversion
+ if (value instanceof Text) {
+ // Arrow VarCharVector.getObject() returns Text objects, but Pinot
expects String
+ return value.toString();
+ }
+
+ // Handle Arrow LocalDateTime -> java.sql.Timestamp conversion
+ if (value instanceof LocalDateTime) {
+ // Arrow TimeStampMilliVector.getObject() returns LocalDateTime, but
Pinot expects
+ // java.sql.Timestamp objects for proper timestamp handling and native
support
+ LocalDateTime dateTime = (LocalDateTime) value;
+ return Timestamp.from(dateTime.toInstant(ZoneOffset.UTC));
+ }
+
+ // Handle other potential Arrow-specific types that might cause issues
+
+ // For primitive types (Integer, Double, Boolean) and other Java standard
types,
+ // Arrow returns standard Java objects that are already Pinot-compatible
+ return value;
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
new file mode 100644
index 00000000000..a6b8adc9ca9
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.plugin.inputformat.arrow.util.ArrowTestDataUtil;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ArrowMessageDecoderTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowMessageDecoderTest.class);
+
+ @Test
+ public void testArrowMessageDecoderWithDifferentAllocatorLimits()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ // Test with custom allocator limit
+ Map<String, String> props = new HashMap<>();
+ props.put(ArrowMessageDecoder.ARROW_ALLOCATOR_LIMIT, "67108864"); // 64MB
+
+ Set<String> fieldsToRead = Sets.newHashSet("field1");
+ String topicName = "test-topic-custom";
+
+ decoder.init(props, fieldsToRead, topicName);
+ decoder.close();
+
+ // Test with default allocator limit
+ ArrowMessageDecoder decoder2 = new ArrowMessageDecoder();
+ Map<String, String> props2 = new HashMap<>(); // No allocator limit set
+
+ decoder2.init(props2, fieldsToRead, topicName);
+ decoder2.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderMultipleInits()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id");
+ String topicName = "test-multiple-init";
+
+ // Test multiple initializations (should work without issues)
+ decoder.init(props, fieldsToRead, topicName);
+ decoder.init(props, fieldsToRead, topicName);
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecodingWithInvalidData()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "name", "age");
+ String topicName = "test-arrow-topic";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Test various invalid data scenarios
+ byte[] invalidData1 = "invalid arrow data".getBytes();
+ byte[] invalidData2 = new byte[]{1, 2, 3, 4, 5};
+ byte[] emptyData = new byte[0];
+
+ GenericRow destination = new GenericRow();
+
+ // Should return null for all invalid data types and null
+ assertNull(decoder.decode(null, destination));
+ assertNull(decoder.decode(invalidData1, destination));
+ assertNull(decoder.decode(invalidData2, destination));
+ assertNull(decoder.decode(emptyData, destination));
+
+ // Test with null destination
+ assertNull(decoder.decode(invalidData1, null));
+
+ // Clean up
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderCloseMultipleTimes()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id");
+ String topicName = "test-multiple-close";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Close multiple times should not cause issues
+ decoder.close();
+ decoder.close();
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithArrowDataAndDestination()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "name");
+ String topicName = "test-real-arrow-with-destination";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create real Arrow IPC data
+ byte[] realArrowData = ArrowTestDataUtil.createValidArrowIpcData(1);
+
+ // Test with provided destination containing existing data
+ GenericRow destination = new GenericRow();
+ destination.putValue("existing_field", "existing_value");
+
+ GenericRow result = decoder.decode(realArrowData, destination);
+
+ // Should return the same destination object (testing
ArrowToGenericRowConverter destination
+ // handling)
+ assertSame(destination, result);
+
+ // Should preserve existing data
+ assertEquals("existing_value", result.getValue("existing_field"));
+
+ // Should contain new converted Arrow data
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(1, rows.size());
+ assertEquals(1, rows.get(0).getValue("id"));
+ assertEquals("name_1", rows.get(0).getValue("name"));
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithEmptyData()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "name");
+ String topicName = "test-empty-arrow-data";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Test with empty Arrow data (zero batches)
+ byte[] emptyArrowData = ArrowTestDataUtil.createEmptyArrowIpcData();
+ GenericRow result = decoder.decode(emptyArrowData, null);
+
+ // Should handle empty data gracefully - might return null or empty result
+ // This tests the edge case of zero batches
+ if (result != null) {
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ if (rows != null) {
+ assertEquals(0, rows.size());
+ }
+ }
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithMultipleDataTypes()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "name", "price",
"active", "timestamp");
+ String topicName = "test-multi-type-arrow-data";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with multiple data types
+ byte[] multiTypeArrowData =
ArrowTestDataUtil.createMultiTypeArrowIpcData(3);
+ GenericRow result = decoder.decode(multiTypeArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(3, rows.size());
+
+ // Verify different data types are correctly handled
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ assertEquals("product_1", row0.getValue("name").toString());
+ assertEquals(10.99, (Double) row0.getValue("price"), 0.01);
+ assertEquals(true, row0.getValue("active")); // BitVector returns boolean
+ assertNotNull(row0.getValue("timestamp")); // Timestamp should be present
+
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ assertEquals("product_2", row1.getValue("name").toString());
+ assertEquals(15.99, (Double) row1.getValue("price"), 0.01);
+ assertEquals(false, row1.getValue("active"));
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithBatchContainingMultipleRows()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "batch_num", "value");
+ String topicName = "test-multi-batch-arrow-data";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with multiple batches - but note: ArrowMessageDecoder
processes one batch
+ // per decode() call
+ // So we test with a single batch containing multiple rows instead
+ byte[] multiBatchArrowData =
+ ArrowTestDataUtil.createMultiBatchArrowIpcData(1, 3); // 1 batch, 3
rows
+ GenericRow result = decoder.decode(multiBatchArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(3, rows.size()); // 1 batch × 3 rows = 3 total rows
+
+ // Verify data from the batch
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ assertEquals(0, row0.getValue("batch_num"));
+ assertEquals("batch_0_row_0", row0.getValue("value").toString());
+
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ assertEquals(0, row1.getValue("batch_num"));
+ assertEquals("batch_0_row_1", row1.getValue("value").toString());
+
+ GenericRow row2 = rows.get(2);
+ assertEquals(3, row2.getValue("id"));
+ assertEquals(0, row2.getValue("batch_num"));
+ assertEquals("batch_0_row_2", row2.getValue("value").toString());
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithDictionaryEncodedData()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "category", "price");
+ String topicName = "test-dictionary-encoded-arrow-data";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with real dictionary encoding
+ byte[] dictionaryArrowData =
ArrowTestDataUtil.createDictionaryEncodedArrowIpcData(8);
+ GenericRow result = decoder.decode(dictionaryArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(8, rows.size());
+
+ // Verify dictionary-encoded values are properly decoded by
ArrowToGenericRowConverter
+ // Dictionary: id=1 -> "Electronics", id=2 -> "Books", id=3 -> "Clothing",
id=4 -> "Home"
+ // Data cycles through indices 0,1,2,3,0,1,2,3 which should be resolved to
string values
+
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ assertEquals("Electronics", row0.getValue("category"));
+ assertEquals(19.99, (Double) row0.getValue("price"), 0.01);
+
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ assertEquals("Books", row1.getValue("category"));
+ assertEquals(29.99, (Double) row1.getValue("price"), 0.01);
+
+ GenericRow row2 = rows.get(2);
+ assertEquals(3, row2.getValue("id"));
+ assertEquals("Clothing", row2.getValue("category"));
+ assertEquals(39.99, (Double) row2.getValue("price"), 0.01);
+
+ GenericRow row3 = rows.get(3);
+ assertEquals(4, row3.getValue("id"));
+ assertEquals("Home", row3.getValue("category"));
+ assertEquals(49.99, (Double) row3.getValue("price"), 0.01);
+
+ // Verify cycling continues - row 4 should have same category as row 0
+ GenericRow row4 = rows.get(4);
+ assertEquals(5, row4.getValue("id"));
+ assertEquals("Electronics", row4.getValue("category"));
+ assertEquals(59.99, (Double) row4.getValue("price"), 0.01);
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowDataTypeCompatibility()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "name", "price",
"active", "timestamp");
+ String topicName = "test-data-type-compatibility";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with multiple data types to verify compatibility
+ byte[] multiTypeArrowData =
ArrowTestDataUtil.createMultiTypeArrowIpcData(3);
+ GenericRow result = decoder.decode(multiTypeArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(3, rows.size());
+
+ // Check the actual data types returned by Arrow and verify Pinot
compatibility
+ GenericRow row0 = rows.get(0);
+
+ // Verify each field's type and compatibility
+ Object idValue = row0.getValue("id");
+ assertNotNull(idValue, "ID should not be null");
+ assertTrue(idValue instanceof Integer, "ID should be Integer compatible");
+
+ Object nameValue = row0.getValue("name");
+ assertNotNull(nameValue, "Name should not be null");
+ // After conversion, Arrow Text should be converted to String for Pinot
compatibility
+ assertTrue(nameValue instanceof String, "Name should be String after
conversion");
+ assertEquals("product_1", nameValue);
+ LOGGER.info("Arrow name field successfully converted to String: {}",
nameValue);
+
+ Object priceValue = row0.getValue("price");
+ assertNotNull(priceValue, "Price should not be null");
+ assertTrue(priceValue instanceof Double, "Price should be Double
compatible");
+
+ Object activeValue = row0.getValue("active");
+ assertNotNull(activeValue, "Active should not be null");
+ // BitVector.getObject() returns Boolean
+ assertTrue(activeValue instanceof Boolean, "Active should be Boolean
compatible");
+
+ Object timestampValue = row0.getValue("timestamp");
+ assertNotNull(timestampValue, "Timestamp should not be null");
+ // After conversion, Arrow LocalDateTime should be converted to
java.sql.Timestamp for Pinot
+ // compatibility
+ assertTrue(
+ timestampValue instanceof java.sql.Timestamp,
+ "Timestamp should be java.sql.Timestamp after conversion");
+ java.sql.Timestamp ts = (java.sql.Timestamp) timestampValue;
+ assertTrue(ts.getTime() > 0, "Timestamp should be a positive value");
+ LOGGER.info(
+ "Arrow timestamp field successfully converted to java.sql.Timestamp:
{}", timestampValue);
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithListVectors()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "numbers", "tags");
+ String topicName = "test-list-vectors";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with List vectors
+ byte[] listArrowData = ArrowTestDataUtil.createListArrowIpcData(3);
+ GenericRow result = decoder.decode(listArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(3, rows.size());
+
+ // Verify first row - should have 1 number and 2 tags
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ Object numbersValue0 = row0.getValue("numbers");
+ assertNotNull(numbersValue0, "Numbers should not be null");
+ assertTrue(numbersValue0 instanceof List);
+ @SuppressWarnings("unchecked")
+ List<Object> numbersList0 = (List<Object>) numbersValue0;
+ assertEquals(1, numbersList0.size());
+ assertEquals(10, numbersList0.get(0));
+
+ Object tagsValue0 = row0.getValue("tags");
+ assertNotNull(tagsValue0, "Tags should not be null");
+ assertTrue(tagsValue0 instanceof List);
+ @SuppressWarnings("unchecked")
+ List<Object> tagsList0 = (List<Object>) tagsValue0;
+ assertEquals(2, tagsList0.size());
+ assertEquals("tag_0_0", tagsList0.get(0).toString());
+ assertEquals("tag_0_1", tagsList0.get(1).toString());
+
+ // Verify second row - should have 2 numbers and 2 tags
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ Object numbersValue1 = row1.getValue("numbers");
+ assertNotNull(numbersValue1);
+ @SuppressWarnings("unchecked")
+ List<Object> numbersList1 = (List<Object>) numbersValue1;
+ assertEquals(2, numbersList1.size());
+ assertEquals(20, numbersList1.get(0));
+ assertEquals(21, numbersList1.get(1));
+
+ // Verify third row - should have 3 numbers
+ GenericRow row2 = rows.get(2);
+ assertEquals(3, row2.getValue("id"));
+ Object numbersValue2 = row2.getValue("numbers");
+ @SuppressWarnings("unchecked")
+ List<Object> numbersList2 = (List<Object>) numbersValue2;
+ assertEquals(3, numbersList2.size());
+ assertEquals(30, numbersList2.get(0));
+ assertEquals(31, numbersList2.get(1));
+ assertEquals(32, numbersList2.get(2));
+
+ LOGGER.info("List vector test completed successfully with {} rows",
rows.size());
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithStructVectors()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "person");
+ String topicName = "test-struct-vectors";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with Struct vectors
+ byte[] structArrowData = ArrowTestDataUtil.createStructArrowIpcData(2);
+ GenericRow result = decoder.decode(structArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(2, rows.size());
+
+ // Verify first row with nested struct
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ Object personValue0 = row0.getValue("person");
+ assertNotNull(personValue0);
+ assertTrue(personValue0 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> personMap0 = (Map<String, Object>) personValue0;
+ assertEquals("Person_1", personMap0.get("name").toString());
+ assertEquals(25, personMap0.get("age"));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> address0 = (Map<String, Object>)
personMap0.get("address");
+ assertEquals("1 Main St", address0.get("street").toString());
+ assertEquals("City_1", address0.get("city").toString());
+
+ // Verify second row
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ Object personValue1 = row1.getValue("person");
+ assertNotNull(personValue1);
+ assertTrue(personValue1 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> personMap1 = (Map<String, Object>) personValue1;
+ assertEquals("Person_2", personMap1.get("name").toString());
+ assertEquals(26, personMap1.get("age"));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> address1 = (Map<String, Object>)
personMap1.get("address");
+ assertEquals("2 Main St", address1.get("street").toString());
+ assertEquals("City_2", address1.get("city").toString());
+
+ LOGGER.info("Struct vector test completed successfully with {} rows",
rows.size());
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithMapVectors()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "metadata");
+ String topicName = "test-map-vectors";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with Map vectors
+ byte[] mapArrowData = ArrowTestDataUtil.createMapArrowIpcData(2);
+ GenericRow result = decoder.decode(mapArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(2, rows.size());
+
+ // Verify first row with map data
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ Object metadataValue0 = row0.getValue("metadata");
+ assertNotNull(metadataValue0);
+ assertTrue(metadataValue0 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> meta0 = (Map<String, Object>) metadataValue0;
+ assertTrue(meta0.values().contains(100));
+ assertTrue(meta0.values().contains(101));
+
+ // Verify second row - should have 3 entries (2 + (1%2) = 3)
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ Object metadataValue1 = row1.getValue("metadata");
+ assertNotNull(metadataValue1);
+ assertTrue(metadataValue1 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> meta1 = (Map<String, Object>) metadataValue1;
+ assertTrue(meta1.values().contains(200));
+ assertTrue(meta1.values().contains(201));
+ assertTrue(meta1.values().contains(202));
+
+ LOGGER.info("Map vector test completed successfully with {} rows",
rows.size());
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithNestedMapValues()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "metadata");
+ String topicName = "test-nested-map-values";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with Map values that are themselves Maps
+ byte[] nestedMapArrowData =
ArrowTestDataUtil.createNestedMapArrowIpcData(2);
+ GenericRow result = decoder.decode(nestedMapArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(2, rows.size());
+
+ // Verify first row: metadata is a Map<String, Map<String, Integer>>
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ Object metadataValue0 = row0.getValue("metadata");
+ assertNotNull(metadataValue0);
+ assertTrue(metadataValue0 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> outer0 = (Map<String, Object>) metadataValue0;
+ assertTrue(outer0.size() >= 2);
+ for (Object innerMapObj : outer0.values()) {
+ assertTrue(innerMapObj instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> inner = (Map<String, Object>) innerMapObj;
+ assertTrue(inner.size() >= 2);
+ // Values should be integers from generator
+ for (Object v : inner.values()) {
+ assertTrue(v instanceof Integer);
+ }
+ }
+
+ // Verify second row similarly
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ Object metadataValue1 = row1.getValue("metadata");
+ assertNotNull(metadataValue1);
+ assertTrue(metadataValue1 instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> outer1 = (Map<String, Object>) metadataValue1;
+ assertTrue(outer1.size() >= 2);
+ boolean sawThreeInner = false;
+ for (Object innerMapObj : outer1.values()) {
+ assertTrue(innerMapObj instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> inner = (Map<String, Object>) innerMapObj;
+ if (inner.size() == 3) {
+ sawThreeInner = true;
+ }
+ }
+ assertTrue(sawThreeInner);
+
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowMessageDecoderWithNestedListStruct()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "items");
+ String topicName = "test-nested-list-struct";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Create Arrow data with nested List of Structs
+ byte[] nestedArrowData =
ArrowTestDataUtil.createNestedListStructArrowIpcData(3);
+ GenericRow result = decoder.decode(nestedArrowData, null);
+
+ assertNotNull(result);
+ @SuppressWarnings("unchecked")
+ List<GenericRow> rows = (List<GenericRow>)
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(rows);
+ assertEquals(3, rows.size());
+
+ // Verify first row - should have 1 item (1 + (0%3) = 1)
+ GenericRow row0 = rows.get(0);
+ assertEquals(1, row0.getValue("id"));
+ Object itemsValue0 = row0.getValue("items");
+ assertNotNull(itemsValue0);
+ assertTrue(itemsValue0 instanceof List);
+ @SuppressWarnings("unchecked")
+ List<Object> items0 = (List<Object>) itemsValue0;
+ assertEquals(1, items0.size());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> item00 = (Map<String, Object>) items0.get(0);
+ assertEquals("item_0_0", item00.get("item_name").toString());
+ assertEquals(10.99, (Double) item00.get("item_price"), 0.01);
+
+ // Verify second row - should have 2 items (1 + (1%3) = 2)
+ GenericRow row1 = rows.get(1);
+ assertEquals(2, row1.getValue("id"));
+ Object itemsValue1 = row1.getValue("items");
+ assertNotNull(itemsValue1);
+ @SuppressWarnings("unchecked")
+ List<Object> items1 = (List<Object>) itemsValue1;
+ assertEquals(2, items1.size());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> item10 = (Map<String, Object>) items1.get(0);
+ assertEquals("item_1_0", item10.get("item_name").toString());
+ assertEquals(15.99, (Double) item10.get("item_price"), 0.01);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> item11 = (Map<String, Object>) items1.get(1);
+ assertEquals("item_1_1", item11.get("item_name").toString());
+ assertEquals(16.99, (Double) item11.get("item_price"), 0.01);
+
+ // Verify third row - should have 3 items (1 + (2%3) = 3)
+ GenericRow row2 = rows.get(2);
+ assertEquals(3, row2.getValue("id"));
+ Object itemsValue2 = row2.getValue("items");
+ assertNotNull(itemsValue2);
+ @SuppressWarnings("unchecked")
+ List<Object> items2 = (List<Object>) itemsValue2;
+ assertEquals(3, items2.size());
+
+ LOGGER.info("Nested List-Struct test completed successfully with {} rows",
rows.size());
+ decoder.close();
+ }
+
+ @Test
+ public void testArrowNestedStructureCompatibilityWithPinot()
+ throws Exception {
+ ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = Sets.newHashSet("id", "numbers", "person",
"metadata", "items");
+ String topicName = "test-nested-compatibility";
+
+ decoder.init(props, fieldsToRead, topicName);
+
+ // Test each nested structure type individually for compatibility
+ // Test List compatibility
+ byte[] listData = ArrowTestDataUtil.createListArrowIpcData(1);
+ GenericRow listResult = decoder.decode(listData, null);
+ assertNotNull(listResult, "List data should be decodable");
+
+ // Test Struct compatibility
+ byte[] structData = ArrowTestDataUtil.createStructArrowIpcData(1);
+ GenericRow structResult = decoder.decode(structData, null);
+ assertNotNull(structResult, "Struct data should be decodable");
+
+ // Test Map compatibility
+ byte[] mapData = ArrowTestDataUtil.createMapArrowIpcData(1);
+ GenericRow mapResult = decoder.decode(mapData, null);
+ assertNotNull(mapResult, "Map data should be decodable");
+
+ // Test complex nested structures
+ byte[] nestedData =
ArrowTestDataUtil.createNestedListStructArrowIpcData(1);
+ GenericRow nestedResult = decoder.decode(nestedData, null);
+ assertNotNull(nestedResult, "Nested List-Struct data should be decodable");
+
+ // Verify that all simulated nested structures produce valid GenericRow
objects
+ @SuppressWarnings("unchecked")
+ List<GenericRow> listRows =
+ (List<GenericRow>)
listResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(listRows, "List result should contain rows");
+ assertTrue(listRows.size() > 0, "List result should have at least one
row");
+
+ // Verify nested list data is accessible
+ GenericRow firstListRow = listRows.get(0);
+ assertNotNull(firstListRow.getValue("numbers"), "List row should have
numbers");
+
+ @SuppressWarnings("unchecked")
+ List<GenericRow> structRows =
+ (List<GenericRow>)
structResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(structRows, "Struct result should contain rows");
+ assertTrue(structRows.size() > 0, "Struct result should have at least one
row");
+
+ // Verify struct data is accessible
+ GenericRow firstStructRow = structRows.get(0);
+ assertNotNull(firstStructRow.getValue("person"), "Struct row should have
person");
+
+ @SuppressWarnings("unchecked")
+ List<GenericRow> mapRows =
+ (List<GenericRow>) mapResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(mapRows, "Map result should contain rows");
+ assertTrue(mapRows.size() > 0, "Map result should have at least one row");
+
+ // Verify map data is accessible
+ GenericRow firstMapRow = mapRows.get(0);
+ assertNotNull(firstMapRow.getValue("metadata"), "Map row should have
metadata");
+
+ @SuppressWarnings("unchecked")
+ List<GenericRow> nestedRows =
+ (List<GenericRow>)
nestedResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+ assertNotNull(nestedRows, "Nested result should contain rows");
+ assertTrue(nestedRows.size() > 0, "Nested result should have at least one
row");
+
+ // Verify nested list-struct data is accessible
+ GenericRow firstNestedRow = nestedRows.get(0);
+ assertNotNull(firstNestedRow.getValue("items"), "Nested row should have
items");
+
+ LOGGER.info(
+ "All nested structure types are compatible with ArrowMessageDecoder
and produce valid GenericRow objects");
+ decoder.close();
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
new file mode 100644
index 00000000000..206dc7d85a5
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow.util;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+
+public class ArrowTestDataUtil {
+
+ private ArrowTestDataUtil() {
+ }
+
+ public static byte[] createValidArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Field nameField = new Field("name", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Schema schema = new Schema(Arrays.asList(idField, nameField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ nameVector.allocateNew(numRows * 10, numRows);
+
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ nameVector.set(i, ("name_" + (i + 1)).getBytes());
+ }
+
+ idVector.setValueCount(numRows);
+ nameVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createMultiTypeArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Field nameField = new Field("name", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field priceField =
+ new Field(
+ "price",
+ FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+ null);
+ Field activeField = new Field("active", FieldType.nullable(new
ArrowType.Bool()), null);
+ Field timestampField =
+ new Field(
+ "timestamp",
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND,
null)),
+ null);
+
+ Schema schema =
+ new Schema(Arrays.asList(idField, nameField, priceField,
activeField, timestampField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ Float8Vector priceVector = (Float8Vector) root.getVector("price");
+ BitVector activeVector = (BitVector) root.getVector("active");
+ TimeStampMilliVector timestampVector = (TimeStampMilliVector)
root.getVector("timestamp");
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ nameVector.allocateNew(numRows * 20, numRows);
+ priceVector.allocateNew(numRows);
+ activeVector.allocateNew(numRows);
+ timestampVector.allocateNew(numRows);
+
+ long baseTime = System.currentTimeMillis();
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ nameVector.set(i, ("product_" + (i + 1)).getBytes());
+ priceVector.set(i, 10.99 + (i * 5.0));
+ activeVector.set(i, i % 2 == 0 ? 1 : 0);
+ timestampVector.set(i, baseTime + (i * 1000L));
+ }
+
+ idVector.setValueCount(numRows);
+ nameVector.setValueCount(numRows);
+ priceVector.setValueCount(numRows);
+ activeVector.setValueCount(numRows);
+ timestampVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createMultiBatchArrowIpcData(int batchCount, int
rowsPerBatch)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Field batchField =
+ new Field("batch_num", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Field valueField = new Field("value", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Schema schema = new Schema(Arrays.asList(idField, batchField,
valueField));
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (WritableByteChannel channel = Channels.newChannel(outputStream);
+ VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null,
channel)) {
+
+ writer.start();
+
+ IntVector idVector = (IntVector) root.getVector("id");
+ IntVector batchVector = (IntVector) root.getVector("batch_num");
+ VarCharVector valueVector = (VarCharVector) root.getVector("value");
+
+ int totalRowId = 1;
+ for (int batch = 0; batch < batchCount; batch++) {
+ root.allocateNew();
+ idVector.allocateNew(rowsPerBatch);
+ batchVector.allocateNew(rowsPerBatch);
+ valueVector.allocateNew(rowsPerBatch * 15, rowsPerBatch);
+
+ for (int row = 0; row < rowsPerBatch; row++) {
+ idVector.set(row, totalRowId++);
+ batchVector.set(row, batch);
+ valueVector.set(row, ("batch_" + batch + "_row_" +
row).getBytes());
+ }
+
+ idVector.setValueCount(rowsPerBatch);
+ batchVector.setValueCount(rowsPerBatch);
+ valueVector.setValueCount(rowsPerBatch);
+ root.setRowCount(rowsPerBatch);
+
+ writer.writeBatch();
+ }
+
+ writer.end();
+ return outputStream.toByteArray();
+ }
+ }
+ }
+
+ public static byte[] createEmptyArrowIpcData()
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Field nameField = new Field("name", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Schema schema = new Schema(Arrays.asList(idField, nameField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ root.setRowCount(0);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (WritableByteChannel channel = Channels.newChannel(outputStream);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null,
channel)) {
+
+ writer.start();
+ writer.end();
+ }
+
+ return outputStream.toByteArray();
+ }
+ }
+ }
+
+ public static byte[] createDictionaryEncodedArrowIpcData(int numRows)
+ throws Exception {
+ List<String> dictionaryValues = Arrays.asList("Electronics", "Books",
"Clothing", "Home");
+ DictionaryEncoding dictionaryEncoding =
+ new DictionaryEncoding(1L, false, new ArrowType.Int(32, true));
+
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ VarCharVector dictionaryVector = new VarCharVector("category_dict",
allocator);
+ IntVector idVector = new IntVector("id", allocator);
+ Float8Vector priceVector = new Float8Vector("price", allocator);
+ VarCharVector categoryUnencoded =
+ new VarCharVector(
+ "category",
+ new FieldType(true, new ArrowType.Utf8(), dictionaryEncoding),
+ allocator)) {
+
+ dictionaryVector.allocateNew();
+ for (int i = 0; i < dictionaryValues.size(); i++) {
+ dictionaryVector.set(i, dictionaryValues.get(i).getBytes());
+ }
+ dictionaryVector.setValueCount(dictionaryValues.size());
+
+ Dictionary dictionary = new Dictionary(dictionaryVector,
dictionaryEncoding);
+ DictionaryProvider.MapDictionaryProvider dictionaryProvider =
+ new DictionaryProvider.MapDictionaryProvider();
+ dictionaryProvider.put(dictionary);
+
+ idVector.allocateNew(numRows);
+ priceVector.allocateNew(numRows);
+ categoryUnencoded.allocateNew(numRows);
+
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ categoryUnencoded.set(i, dictionaryValues.get(i %
dictionaryValues.size()).getBytes());
+ priceVector.set(i, 19.99 + (i * 10.0));
+ }
+ idVector.setValueCount(numRows);
+ priceVector.setValueCount(numRows);
+ categoryUnencoded.setValueCount(numRows);
+
+ try (org.apache.arrow.vector.FieldVector encodedCategoryVector =
+ (org.apache.arrow.vector.FieldVector)
+ DictionaryEncoder.encode(categoryUnencoded, dictionary)) {
+ List<Field> fields =
+ Arrays.asList(
+ idVector.getField(), encodedCategoryVector.getField(),
priceVector.getField());
+ List<org.apache.arrow.vector.FieldVector> vectors =
+ Arrays.asList(idVector, encodedCategoryVector, priceVector);
+ try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {
+ return writeArrowDataToBytes(root, dictionaryProvider);
+ }
+ }
+ }
+ }
+
+ public static byte[] createListArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field numbersElementField =
+ new Field("$data$", FieldType.nullable(new ArrowType.Int(32, true)),
null);
+ Field numbersField =
+ new Field(
+ "numbers",
+ FieldType.nullable(new ArrowType.List()),
+ Arrays.asList(numbersElementField));
+
+ Field tagsElementField = new Field("$data$", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field tagsField =
+ new Field(
+ "tags", FieldType.nullable(new ArrowType.List()),
Arrays.asList(tagsElementField));
+
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Schema schema = new Schema(Arrays.asList(idField, numbersField,
tagsField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ ListVector numbersVector = (ListVector) root.getVector("numbers");
+ ListVector tagsVector = (ListVector) root.getVector("tags");
+ IntVector numbersChild = (IntVector) numbersVector.getDataVector();
+ VarCharVector tagsChild = (VarCharVector) tagsVector.getDataVector();
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ numbersVector.allocateNew();
+ tagsVector.allocateNew();
+
+ int numbersElemIndex = 0;
+ int tagsElemIndex = 0;
+
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+
+ numbersVector.startNewValue(i);
+ for (int j = 0; j <= i; j++) {
+ numbersChild.setSafe(numbersElemIndex++, (i + 1) * 10 + j);
+ }
+ numbersVector.endValue(i, i + 1);
+
+ tagsVector.startNewValue(i);
+ for (int j = 0; j < 2; j++) {
+ tagsChild.setSafe(tagsElemIndex++, ("tag_" + i + "_" +
j).getBytes());
+ }
+ tagsVector.endValue(i, 2);
+ }
+
+ idVector.setValueCount(numRows);
+ numbersChild.setValueCount(numbersElemIndex);
+ numbersVector.setValueCount(numRows);
+ tagsChild.setValueCount(tagsElemIndex);
+ tagsVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createStructArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field nameField = new Field("name", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field ageField = new Field("age", FieldType.nullable(new
ArrowType.Int(32, true)), null);
+ Field streetField = new Field("street", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field cityField = new Field("city", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field addressField =
+ new Field(
+ "address",
+ FieldType.nullable(new ArrowType.Struct()),
+ Arrays.asList(streetField, cityField));
+
+ Field personField =
+ new Field(
+ "person",
+ FieldType.nullable(new ArrowType.Struct()),
+ Arrays.asList(nameField, ageField, addressField));
+
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Schema schema = new Schema(Arrays.asList(idField, personField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ StructVector personVector = (StructVector) root.getVector("person");
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ personVector.allocateNew();
+
+ VarCharVector nameVector = (VarCharVector)
personVector.getChild("name");
+ IntVector ageVector = (IntVector) personVector.getChild("age");
+ StructVector addressVector = (StructVector)
personVector.getChild("address");
+ VarCharVector streetVector = (VarCharVector)
addressVector.getChild("street");
+ VarCharVector cityVector = (VarCharVector)
addressVector.getChild("city");
+
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ personVector.setIndexDefined(i);
+ addressVector.setIndexDefined(i);
+ nameVector.setSafe(i, ("Person_" + (i + 1)).getBytes());
+ ageVector.setSafe(i, 25 + i);
+ streetVector.setSafe(i, ((i + 1) + " Main St").getBytes());
+ cityVector.setSafe(i, ("City_" + (i + 1)).getBytes());
+ }
+
+ idVector.setValueCount(numRows);
+ personVector.setValueCount(numRows);
+ nameVector.setValueCount(numRows);
+ ageVector.setValueCount(numRows);
+ addressVector.setValueCount(numRows);
+ streetVector.setValueCount(numRows);
+ cityVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createMapArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field keyField =
+ new Field(MapVector.KEY_NAME, FieldType.notNullable(new
ArrowType.Utf8()), null);
+ Field valField =
+ new Field(MapVector.VALUE_NAME, FieldType.nullable(new
ArrowType.Int(32, true)), null);
+ Field entriesField =
+ new Field(
+ MapVector.DATA_VECTOR_NAME,
+ FieldType.notNullable(new ArrowType.Struct()),
+ Arrays.asList(keyField, valField));
+ Field mapField =
+ new Field(
+ "metadata",
+ FieldType.nullable(new ArrowType.Map(false)),
+ Arrays.asList(entriesField));
+
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Schema schema = new Schema(Arrays.asList(idField, mapField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ MapVector mapVector = (MapVector) root.getVector("metadata");
+ StructVector entries = (StructVector) mapVector.getDataVector();
+ VarCharVector keyVector = (VarCharVector)
entries.getChild(MapVector.KEY_NAME);
+ IntVector valueVector = (IntVector)
entries.getChild(MapVector.VALUE_NAME);
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ mapVector.allocateNew();
+
+ int entryIndex = 0;
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ int entriesCount = 2 + (i % 2);
+ mapVector.startNewValue(i);
+ for (int j = 0; j < entriesCount; j++) {
+ keyVector.setSafe(entryIndex, ("key_" + i + "_" + j).getBytes());
+ valueVector.setSafe(entryIndex, (i + 1) * 100 + j);
+ entries.setIndexDefined(entryIndex);
+ entryIndex++;
+ }
+ mapVector.endValue(i, entriesCount);
+ }
+
+ idVector.setValueCount(numRows);
+ keyVector.setValueCount(entryIndex);
+ valueVector.setValueCount(entryIndex);
+ entries.setValueCount(entryIndex);
+ mapVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createNestedMapArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ // Define inner map (value of outer map)
+ Field innerKeyField =
+ new Field(MapVector.KEY_NAME, FieldType.notNullable(new
ArrowType.Utf8()), null);
+ Field innerValField =
+ new Field(MapVector.VALUE_NAME, FieldType.nullable(new
ArrowType.Int(32, true)), null);
+ Field innerEntriesField =
+ new Field(
+ MapVector.DATA_VECTOR_NAME,
+ FieldType.notNullable(new ArrowType.Struct()),
+ Arrays.asList(innerKeyField, innerValField));
+ Field innerMapField =
+ new Field(
+ MapVector.VALUE_NAME,
+ FieldType.nullable(new ArrowType.Map(false)),
+ Arrays.asList(innerEntriesField));
+
+ // Define outer map with value as the inner map
+ Field outerKeyField =
+ new Field(MapVector.KEY_NAME, FieldType.notNullable(new
ArrowType.Utf8()), null);
+ Field outerEntriesField =
+ new Field(
+ MapVector.DATA_VECTOR_NAME,
+ FieldType.notNullable(new ArrowType.Struct()),
+ Arrays.asList(outerKeyField, innerMapField));
+ Field outerMapField =
+ new Field(
+ "metadata",
+ FieldType.nullable(new ArrowType.Map(false)),
+ Arrays.asList(outerEntriesField));
+
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Schema schema = new Schema(Arrays.asList(idField, outerMapField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ MapVector outerMapVector = (MapVector) root.getVector("metadata");
+ StructVector outerEntries = (StructVector)
outerMapVector.getDataVector();
+ VarCharVector outerKeyVector = (VarCharVector)
outerEntries.getChild(MapVector.KEY_NAME);
+ MapVector innerMapVector = (MapVector)
outerEntries.getChild(MapVector.VALUE_NAME);
+ StructVector innerEntries = (StructVector)
innerMapVector.getDataVector();
+ VarCharVector innerKeyVector = (VarCharVector)
innerEntries.getChild(MapVector.KEY_NAME);
+ IntVector innerValueVector = (IntVector)
innerEntries.getChild(MapVector.VALUE_NAME);
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ outerMapVector.allocateNew();
+
+ int outerEntryIndex = 0;
+ int innerEntryIndex = 0;
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+
+ int outerEntriesCount = 2 + (i % 2); // 2 or 3 outer entries
+ outerMapVector.startNewValue(i);
+ for (int j = 0; j < outerEntriesCount; j++) {
+ // Set outer key
+ outerKeyVector.setSafe(outerEntryIndex, ("outer_key_" + i + "_" +
j).getBytes());
+
+ // Populate inner map for this outer entry at aligned index
+ innerMapVector.startNewValue(outerEntryIndex);
+ int innerEntriesCount = 2 + (j % 2); // 2 or 3 inner entries
+ for (int k = 0; k < innerEntriesCount; k++) {
+ innerKeyVector.setSafe(
+ innerEntryIndex, ("inner_key_" + i + "_" + j + "_" +
k).getBytes());
+ innerValueVector.setSafe(innerEntryIndex, (i + 1) * 1000 + j *
10 + k);
+ innerEntries.setIndexDefined(innerEntryIndex);
+ innerEntryIndex++;
+ }
+ innerMapVector.endValue(outerEntryIndex, innerEntriesCount);
+
+ outerEntries.setIndexDefined(outerEntryIndex);
+ outerEntryIndex++;
+ }
+ outerMapVector.endValue(i, outerEntriesCount);
+ }
+
+ idVector.setValueCount(numRows);
+ outerKeyVector.setValueCount(outerEntryIndex);
+ innerKeyVector.setValueCount(innerEntryIndex);
+ innerValueVector.setValueCount(innerEntryIndex);
+ innerEntries.setValueCount(innerEntryIndex);
+ innerMapVector.setValueCount(outerEntryIndex);
+ outerEntries.setValueCount(outerEntryIndex);
+ outerMapVector.setValueCount(numRows);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ public static byte[] createNestedListStructArrowIpcData(int numRows)
+ throws Exception {
+ try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Field itemNameField = new Field("item_name", FieldType.nullable(new
ArrowType.Utf8()), null);
+ Field itemPriceField =
+ new Field(
+ "item_price",
+ FieldType.nullable(new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+ null);
+ Field itemStructField =
+ new Field(
+ "$data$",
+ FieldType.nullable(new ArrowType.Struct()),
+ Arrays.asList(itemNameField, itemPriceField));
+
+ Field itemsField =
+ new Field(
+ "items", FieldType.nullable(new ArrowType.List()),
Arrays.asList(itemStructField));
+
+ Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32,
true)), null);
+ Schema schema = new Schema(Arrays.asList(idField, itemsField));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ ListVector itemsVector = (ListVector) root.getVector("items");
+ StructVector itemStructVector = (StructVector)
itemsVector.getDataVector();
+ VarCharVector itemNameVector = (VarCharVector)
itemStructVector.getChild("item_name");
+ Float8Vector itemPriceVector = (Float8Vector)
itemStructVector.getChild("item_price");
+
+ root.allocateNew();
+ idVector.allocateNew(numRows);
+ itemsVector.allocateNew();
+
+ int structIndex = 0;
+ for (int i = 0; i < numRows; i++) {
+ idVector.set(i, i + 1);
+ int itemsCount = 1 + (i % 3);
+ itemsVector.startNewValue(i);
+ for (int j = 0; j < itemsCount; j++) {
+ itemNameVector.setSafe(structIndex, ("item_" + i + "_" +
j).getBytes());
+ itemPriceVector.setSafe(structIndex, 10.99 + (i * 5.0) + j);
+ itemStructVector.setIndexDefined(structIndex);
+ structIndex++;
+ }
+ itemsVector.endValue(i, itemsCount);
+ }
+
+ idVector.setValueCount(numRows);
+ itemsVector.setValueCount(numRows);
+ itemNameVector.setValueCount(structIndex);
+ itemPriceVector.setValueCount(structIndex);
+ root.setRowCount(numRows);
+
+ return writeArrowDataToBytes(root, null);
+ }
+ }
+ }
+
+ private static byte[] writeArrowDataToBytes(
+ VectorSchemaRoot root, DictionaryProvider dictionaryProvider)
+ throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (WritableByteChannel channel = Channels.newChannel(outputStream);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root,
dictionaryProvider, channel)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ }
+ return outputStream.toByteArray();
+ }
+}
diff --git a/pinot-plugins/pinot-input-format/pom.xml
b/pinot-plugins/pinot-input-format/pom.xml
index e9aabe0079f..78c6057fe0f 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pom.xml
@@ -37,6 +37,7 @@
</properties>
<modules>
+ <module>pinot-arrow</module>
<module>pinot-avro</module>
<module>pinot-avro-base</module>
<module>pinot-clp-log</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]