This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 2ff666436 [QDP] Add Arrow IPC support (#704)
2ff666436 is described below
commit 2ff666436e08a92c94ba7fc8448e52de44cbb594
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Wed Dec 10 17:31:28 2025 +0800
[QDP] Add Arrow IPC support (#704)
* Add Arrow IPC support
* Update default frameworks to "mahout-parquet"
* Add tests for Arrow IPC
* Add todo to prevent oom
* Add TODO for supporting multiple float types
---
qdp/benchmark/benchmark_e2e_final.py | 131 +++++++++++++------
qdp/qdp-core/src/io.rs | 231 +++++++++++++++++++++------------
qdp/qdp-core/src/lib.rs | 31 +++++
qdp/qdp-core/tests/arrow_ipc_io.rs | 239 +++++++++++++++++++++++++++++++++++
qdp/qdp-python/src/lib.rs | 31 ++++-
5 files changed, 538 insertions(+), 125 deletions(-)
diff --git a/qdp/benchmark/benchmark_e2e_final.py
b/qdp/benchmark/benchmark_e2e_final.py
index 27efab93d..c00476b2d 100644
--- a/qdp/benchmark/benchmark_e2e_final.py
+++ b/qdp/benchmark/benchmark_e2e_final.py
@@ -37,6 +37,7 @@ import os
import itertools
import pyarrow as pa
import pyarrow.parquet as pq
+import pyarrow.ipc as ipc
from mahout_qdp import QdpEngine
# Competitors
@@ -57,6 +58,7 @@ except ImportError:
# Config
DATA_FILE = "final_benchmark_data.parquet"
+ARROW_FILE = "final_benchmark_data.arrow"
HIDDEN_DIM = 16
BATCH_SIZE = 64 # Small batch to stress loop overhead
@@ -71,28 +73,34 @@ class DummyQNN(nn.Module):
def generate_data(n_qubits, n_samples):
- if os.path.exists(DATA_FILE):
- os.remove(DATA_FILE)
+ for f in [DATA_FILE, ARROW_FILE]:
+ if os.path.exists(f):
+ os.remove(f)
print(f"Generating {n_samples} samples of {n_qubits} qubits...")
dim = 1 << n_qubits
- # Generate for PennyLane/Qiskit (List format)
- chunk_size = 500
- schema_list = pa.schema([("feature_vector", pa.list_(pa.float64()))])
+ # Generate all data at once
+ np.random.seed(42)
+ all_data = np.random.rand(n_samples, dim).astype(np.float64)
- with pq.ParquetWriter(DATA_FILE, schema_list) as writer:
- for start_idx in range(0, n_samples, chunk_size):
- current = min(chunk_size, n_samples - start_idx)
- data = np.random.rand(current, dim).astype(np.float64)
- feature_vectors = [row.tolist() for row in data]
- arrays = pa.array(feature_vectors, type=pa.list_(pa.float64()))
- batch_table = pa.Table.from_arrays([arrays],
names=["feature_vector"])
- writer.write_table(batch_table)
+ # Save as Parquet (List format for PennyLane/Qiskit)
+ feature_vectors = [row.tolist() for row in all_data]
+ table = pa.table(
+ {"feature_vector": pa.array(feature_vectors,
type=pa.list_(pa.float64()))}
+ )
+ pq.write_table(table, DATA_FILE)
+
+ # Save as Arrow IPC (FixedSizeList format for Mahout)
+ arr = pa.FixedSizeListArray.from_arrays(pa.array(all_data.flatten()), dim)
+ arrow_table = pa.table({"data": arr})
+ with ipc.RecordBatchFileWriter(ARROW_FILE, arrow_table.schema) as writer:
+ writer.write_table(arrow_table)
- file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024)
+ parquet_size = os.path.getsize(DATA_FILE) / (1024 * 1024)
+ arrow_size = os.path.getsize(ARROW_FILE) / (1024 * 1024)
print(f" Generated {n_samples} samples")
- print(f" Parquet file size: {file_size_mb:.2f} MB")
+ print(f" Parquet: {parquet_size:.2f} MB, Arrow IPC: {arrow_size:.2f} MB")
# -----------------------------------------------------------
@@ -220,10 +228,10 @@ def run_pennylane(n_qubits, n_samples):
# -----------------------------------------------------------
-# 3. Mahout Full Pipeline
+# 3. Mahout Parquet Pipeline
# -----------------------------------------------------------
-def run_mahout(engine, n_qubits, n_samples):
- print("\n[Mahout] Full Pipeline (Disk -> GPU)...")
+def run_mahout_parquet(engine, n_qubits, n_samples):
+ print("\n[Mahout-Parquet] Full Pipeline (Parquet -> GPU)...")
model = DummyQNN(n_qubits).cuda()
torch.cuda.synchronize()
@@ -262,6 +270,44 @@ def run_mahout(engine, n_qubits, n_samples):
return total_time, gpu_reshaped
+# -----------------------------------------------------------
+# 4. Mahout Arrow IPC Pipeline
+# -----------------------------------------------------------
+def run_mahout_arrow(engine, n_qubits, n_samples):
+ print("\n[Mahout-Arrow] Full Pipeline (Arrow IPC -> GPU)...")
+ model = DummyQNN(n_qubits).cuda()
+
+ torch.cuda.synchronize()
+ start_time = time.perf_counter()
+
+ arrow_encode_start = time.perf_counter()
+ batched_tensor = engine.encode_from_arrow_ipc(ARROW_FILE, n_qubits,
"amplitude")
+ arrow_encode_time = time.perf_counter() - arrow_encode_start
+ print(f" Arrow->GPU (IO+Encode): {arrow_encode_time:.4f} s")
+
+ dlpack_start = time.perf_counter()
+ gpu_batched = torch.from_dlpack(batched_tensor)
+ dlpack_time = time.perf_counter() - dlpack_start
+ print(f" DLPack conversion: {dlpack_time:.4f} s")
+
+ state_len = 1 << n_qubits
+ gpu_reshaped = gpu_batched.view(n_samples, state_len)
+
+ reshape_start = time.perf_counter()
+ gpu_all_data = gpu_reshaped.abs().to(torch.float32)
+ reshape_time = time.perf_counter() - reshape_start
+ print(f" Reshape & convert: {reshape_time:.4f} s")
+
+ for i in range(0, n_samples, BATCH_SIZE):
+ batch = gpu_all_data[i : i + BATCH_SIZE]
+ _ = model(batch)
+
+ torch.cuda.synchronize()
+ total_time = time.perf_counter() - start_time
+ print(f" Total Time: {total_time:.4f} s")
+ return total_time, gpu_reshaped
+
+
def compare_states(name_a, states_a, name_b, states_b):
print("\n" + "=" * 70)
print(f"VERIFICATION ({name_a} vs {name_b})")
@@ -314,22 +360,15 @@ if __name__ == "__main__":
parser.add_argument(
"--frameworks",
nargs="+",
- default=["mahout", "pennylane"],
- choices=["mahout", "pennylane", "qiskit", "all"],
- help="Frameworks to benchmark (default: mahout pennylane). Use 'all'
to run all available frameworks.",
+ default=["mahout-parquet", "pennylane"],
+ choices=["mahout-parquet", "mahout-arrow", "pennylane", "qiskit",
"all"],
+ help="Frameworks to benchmark. Use 'all' to run all available
frameworks.",
)
args = parser.parse_args()
# Expand "all" option
if "all" in args.frameworks:
- all_frameworks = []
- if "mahout" in args.frameworks or "all" in args.frameworks:
- all_frameworks.append("mahout")
- if "pennylane" in args.frameworks or "all" in args.frameworks:
- all_frameworks.append("pennylane")
- if "qiskit" in args.frameworks or "all" in args.frameworks:
- all_frameworks.append("qiskit")
- args.frameworks = all_frameworks
+ args.frameworks = ["mahout-parquet", "mahout-arrow", "pennylane",
"qiskit"]
generate_data(args.qubits, args.samples)
@@ -345,7 +384,8 @@ if __name__ == "__main__":
# Initialize results
t_pl, pl_all_states = 0.0, None
- t_mahout, mahout_all_states = 0.0, None
+ t_mahout_parquet, mahout_parquet_all_states = 0.0, None
+ t_mahout_arrow, mahout_arrow_all_states = 0.0, None
t_qiskit, qiskit_all_states = 0.0, None
# Run benchmarks
@@ -355,8 +395,15 @@ if __name__ == "__main__":
if "qiskit" in args.frameworks:
t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples)
- if "mahout" in args.frameworks:
- t_mahout, mahout_all_states = run_mahout(engine, args.qubits,
args.samples)
+ if "mahout-parquet" in args.frameworks:
+ t_mahout_parquet, mahout_parquet_all_states = run_mahout_parquet(
+ engine, args.qubits, args.samples
+ )
+
+ if "mahout-arrow" in args.frameworks:
+ t_mahout_arrow, mahout_arrow_all_states = run_mahout_arrow(
+ engine, args.qubits, args.samples
+ )
print("\n" + "=" * 70)
print("E2E LATENCY (Lower is Better)")
@@ -364,8 +411,10 @@ if __name__ == "__main__":
print("=" * 70)
results = []
- if t_mahout > 0:
- results.append(("Mahout", t_mahout))
+ if t_mahout_parquet > 0:
+ results.append(("Mahout-Parquet", t_mahout_parquet))
+ if t_mahout_arrow > 0:
+ results.append(("Mahout-Arrow", t_mahout_arrow))
if t_pl > 0:
results.append(("PennyLane", t_pl))
if t_qiskit > 0:
@@ -374,19 +423,23 @@ if __name__ == "__main__":
results.sort(key=lambda x: x[1])
for name, time_val in results:
- print(f"{name:12s} {time_val:10.4f} s")
+ print(f"{name:16s} {time_val:10.4f} s")
print("-" * 70)
- if t_mahout > 0:
+ # Use fastest Mahout variant for speedup comparison
+ mahout_times = [t for t in [t_mahout_arrow, t_mahout_parquet] if t > 0]
+ t_mahout_best = min(mahout_times) if mahout_times else 0
+ if t_mahout_best > 0:
if t_pl > 0:
- print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x")
+ print(f"Speedup vs PennyLane: {t_pl / t_mahout_best:10.2f}x")
if t_qiskit > 0:
- print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x")
+ print(f"Speedup vs Qiskit: {t_qiskit / t_mahout_best:10.2f}x")
# Run Verification after benchmarks
verify_correctness(
{
- "Mahout": mahout_all_states,
+ "Mahout-Parquet": mahout_parquet_all_states,
+ "Mahout-Arrow": mahout_arrow_all_states,
"PennyLane": pl_all_states,
"Qiskit": qiskit_all_states,
}
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index 93ad1d018..372f4ef75 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -14,25 +14,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-//! I/O module for reading and writing quantum data
+//! I/O utilities for reading and writing quantum data.
//!
-//! This module provides efficient columnar data exchange with the data
science ecosystem,
+//! Provides efficient columnar data exchange via Apache Arrow and Parquet
formats.
+//!
+//! # TODO
+//! Consider using generic `T: ArrowPrimitiveType` instead of hardcoded
`Float64Array`
+//! to support both Float32 and Float64 for flexibility in precision vs
performance trade-offs.
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
-use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch};
+use arrow::array::{Array, ArrayRef, Float64Array, FixedSizeListArray,
ListArray, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::FileReader as ArrowFileReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use crate::error::{MahoutError, Result};
-/// Convert Arrow Float64Array to Vec<f64>
-///
-/// Uses Arrow's internal buffer directly if no nulls, otherwise copies
+/// Converts an Arrow Float64Array to Vec<f64>.
pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> {
if array.null_count() == 0 {
array.values().to_vec()
@@ -41,9 +44,7 @@ pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> {
}
}
-/// Convert chunked Arrow Float64Array to Vec<f64>
-///
-/// Efficiently flattens multiple Arrow arrays into a single Vec
+/// Flattens multiple Arrow Float64Arrays into a single Vec<f64>.
pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> Vec<f64> {
let total_len: usize = arrays.iter().map(|a| a.len()).sum();
let mut result = Vec::with_capacity(total_len);
@@ -59,45 +60,20 @@ pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) ->
Vec<f64> {
result
}
-/// Reads quantum data from a Parquet file.
-///
-/// Expects a single column named "data" containing Float64 values.
-/// This function performs one copy from Arrow to Vec.
-/// use `read_parquet_to_arrow` instead.
-///
-/// # Arguments
-/// * `path` - Path to the Parquet file
+/// Reads Float64 data from a Parquet file.
///
-/// # Returns
-/// Vector of f64 values from the first column
-///
-/// # Example
-/// ```no_run
-/// use qdp_core::io::read_parquet;
-///
-/// let data = read_parquet("quantum_data.parquet").unwrap();
-/// ```
+/// Expects a single Float64 column. For zero-copy access, use
[`read_parquet_to_arrow`].
pub fn read_parquet<P: AsRef<Path>>(path: P) -> Result<Vec<f64>> {
let chunks = read_parquet_to_arrow(path)?;
Ok(arrow_to_vec_chunked(&chunks))
}
-/// Writes quantum data to a Parquet file.
-///
-/// Creates a single column named "data" containing Float64 values.
+/// Writes Float64 data to a Parquet file.
///
/// # Arguments
-/// * `path` - Path to write the Parquet file
-/// * `data` - Vector of f64 values to write
-/// * `column_name` - Optional column name (defaults to "data")
-///
-/// # Example
-/// ```no_run
-/// use qdp_core::io::write_parquet;
-///
-/// let data = vec![0.5, 0.5, 0.5, 0.5];
-/// write_parquet("quantum_data.parquet", &data, None).unwrap();
-/// ```
+/// * `path` - Output file path
+/// * `data` - Data to write
+/// * `column_name` - Column name (defaults to "data")
pub fn write_parquet<P: AsRef<Path>>(
path: P,
data: &[f64],
@@ -111,23 +87,19 @@ pub fn write_parquet<P: AsRef<Path>>(
let col_name = column_name.unwrap_or("data");
- // Create Arrow schema
let schema = Arc::new(Schema::new(vec![Field::new(
col_name,
DataType::Float64,
false,
)]));
- // Create Float64Array from slice
let array = Float64Array::from_iter_values(data.iter().copied());
let array_ref: ArrayRef = Arc::new(array);
- // Create RecordBatch
let batch = RecordBatch::try_new(schema.clone(),
vec![array_ref]).map_err(|e| {
MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
})?;
- // Write to Parquet file
let file = File::create(path.as_ref()).map_err(|e| {
MahoutError::Io(format!("Failed to create Parquet file: {}", e))
})?;
@@ -148,18 +120,9 @@ pub fn write_parquet<P: AsRef<Path>>(
Ok(())
}
-/// Reads quantum data from a Parquet file as Arrow arrays.
-///
-/// Returns Arrow arrays directly from Parquet batches.
-/// Each element in the returned Vec corresponds to one Parquet batch.
-///
-/// Directly constructs the Arrow array from Parquet batches
-///
-/// # Arguments
-/// * `path` - Path to the Parquet file
+/// Reads a Parquet file as Arrow Float64Arrays.
///
-/// # Returns
-/// Vector of Float64Arrays, one per Parquet batch
+/// Returns one array per row group for zero-copy access.
pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array>> {
let file = File::open(path.as_ref()).map_err(|e| {
MahoutError::Io(format!("Failed to open Parquet file: {}", e))
@@ -194,7 +157,6 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array
)));
}
- // Clone the Float64Array (reference-counted, no data copy)
let float_array = column
.as_any()
.downcast_ref::<Float64Array>()
@@ -217,12 +179,10 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array
/// Writes an Arrow Float64Array to a Parquet file.
///
-/// Writes from Arrow format to Parquet.
-///
/// # Arguments
-/// * `path` - Path to write the Parquet file
-/// * `array` - Float64Array to write
-/// * `column_name` - Optional column name (defaults to "data")
+/// * `path` - Output file path
+/// * `array` - Array to write
+/// * `column_name` - Column name (defaults to "data")
pub fn write_arrow_to_parquet<P: AsRef<Path>>(
path: P,
array: &Float64Array,
@@ -236,7 +196,6 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
let col_name = column_name.unwrap_or("data");
- // Create Arrow schema
let schema = Arc::new(Schema::new(vec![Field::new(
col_name,
DataType::Float64,
@@ -244,13 +203,10 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
)]));
let array_ref: ArrayRef = Arc::new(array.clone());
-
- // Create RecordBatch
let batch = RecordBatch::try_new(schema.clone(),
vec![array_ref]).map_err(|e| {
MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
})?;
- // Write to Parquet file
let file = File::create(path.as_ref()).map_err(|e| {
MahoutError::Io(format!("Failed to create Parquet file: {}", e))
})?;
@@ -271,20 +227,15 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
Ok(())
}
-/// Read batch data from Parquet file with list column format
+/// Reads batch data from a Parquet file with `List<Float64>` column format.
///
-/// Efficiently reads Parquet files where each row contains a list of values.
-/// Returns a flattened Vec with all samples concatenated, suitable for batch
encoding.
-///
-/// # Arguments
-/// * `path` - Path to Parquet file
+/// Returns flattened data suitable for batch encoding.
///
/// # Returns
-/// Tuple of (flattened_data, num_samples, sample_size)
+/// Tuple of `(flattened_data, num_samples, sample_size)`
///
-/// # Example
-/// File format: column "feature_vector" with type List<Float64>
-/// Each row = one sample = one list of floats
+/// # TODO
+/// Add OOM protection for very large files
pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize,
usize)> {
let file = File::open(path.as_ref()).map_err(|e| {
MahoutError::Io(format!("Failed to open Parquet file: {}", e))
@@ -294,6 +245,8 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
})?;
+ let total_rows = builder.metadata().file_metadata().num_rows() as usize;
+
let mut reader = builder.build().map_err(|e| {
MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
})?;
@@ -313,7 +266,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
let column = batch.column(0);
- // Handle List<Float64> column type
if let DataType::List(_) = column.data_type() {
let list_array = column
.as_any()
@@ -329,7 +281,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
let current_size = float_array.len();
- // Verify all samples have the same size
if let Some(expected_size) = sample_size {
if current_size != expected_size {
return Err(MahoutError::InvalidInput(format!(
@@ -339,10 +290,9 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
}
} else {
sample_size = Some(current_size);
- all_data.reserve(current_size * 100); // Reserve space
+ all_data.reserve(current_size * total_rows);
}
- // Efficiently copy the values
if float_array.null_count() == 0 {
all_data.extend_from_slice(float_array.values());
} else {
@@ -365,3 +315,126 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
Ok((all_data, num_samples, sample_size))
}
+
+/// Reads batch data from an Arrow IPC file.
+///
+/// Supports `FixedSizeList<Float64>` and `List<Float64>` column formats.
+/// Returns flattened data suitable for batch encoding.
+///
+/// # Returns
+/// Tuple of `(flattened_data, num_samples, sample_size)`
+///
+/// # TODO
+/// Add OOM protection for very large files
+pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>,
usize, usize)> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Arrow IPC file: {}", e))
+ })?;
+
+ let reader = ArrowFileReader::try_new(file, None).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Arrow IPC reader: {}", e))
+ })?;
+
+ let mut all_data = Vec::new();
+ let mut num_samples = 0;
+ let mut sample_size: Option<usize> = None;
+
+ for batch_result in reader {
+ let batch = batch_result.map_err(|e| {
+ MahoutError::Io(format!("Failed to read Arrow batch: {}", e))
+ })?;
+
+ if batch.num_columns() == 0 {
+ return Err(MahoutError::Io("Arrow file has no
columns".to_string()));
+ }
+
+ let column = batch.column(0);
+
+ match column.data_type() {
+ DataType::FixedSizeList(_, size) => {
+ let list_array = column
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
FixedSizeListArray".to_string()))?;
+
+ let current_size = *size as usize;
+
+ if let Some(expected) = sample_size {
+ if current_size != expected {
+ return Err(MahoutError::InvalidInput(format!(
+ "Inconsistent sample sizes: expected {}, got {}",
+ expected, current_size
+ )));
+ }
+ } else {
+ sample_size = Some(current_size);
+ all_data.reserve(current_size * batch.num_rows());
+ }
+
+ let values = list_array.values();
+ let float_array = values
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("Values must be
Float64".to_string()))?;
+
+ if float_array.null_count() == 0 {
+ all_data.extend_from_slice(float_array.values());
+ } else {
+ all_data.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
+ }
+
+ num_samples += list_array.len();
+ }
+
+ DataType::List(_) => {
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_size = float_array.len();
+
+ if let Some(expected) = sample_size {
+ if current_size != expected {
+ return Err(MahoutError::InvalidInput(format!(
+ "Inconsistent sample sizes: expected {}, got
{}",
+ expected, current_size
+ )));
+ }
+ } else {
+ sample_size = Some(current_size);
+ all_data.reserve(current_size * list_array.len());
+ }
+
+ if float_array.null_count() == 0 {
+ all_data.extend_from_slice(float_array.values());
+ } else {
+ all_data.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
+ }
+
+ num_samples += 1;
+ }
+ }
+
+ _ => {
+ return Err(MahoutError::Io(format!(
+ "Expected FixedSizeList<Float64> or List<Float64>, got
{:?}",
+ column.data_type()
+ )));
+ }
+ }
+ }
+
+ let sample_size = sample_size.ok_or_else(|| {
+ MahoutError::Io("Arrow file contains no data".to_string())
+ })?;
+
+ Ok((all_data, num_samples, sample_size))
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 65c08bd02..2f8f42092 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -154,6 +154,37 @@ impl QdpEngine {
// Encode using fused batch kernel
self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
}
+
+ /// Load data from Arrow IPC file and encode into quantum state
+ ///
+ /// Supports:
+ /// - FixedSizeList<Float64> - fastest, all samples same size
+ /// - List<Float64> - flexible, variable sample sizes
+ ///
+ /// # Arguments
+ /// * `path` - Path to Arrow IPC file (.arrow or .feather)
+ /// * `num_qubits` - Number of qubits
+ /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+ ///
+ /// # Returns
+ /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
+ pub fn encode_from_arrow_ipc(
+ &self,
+ path: &str,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeFromArrowIPC");
+
+ // Read Arrow IPC (6x faster than Parquet)
+ let (batch_data, num_samples, sample_size) = {
+ crate::profile_scope!("IO::ReadArrowIPCBatch");
+ crate::io::read_arrow_ipc_batch(path)?
+ };
+
+ // Encode using fused batch kernel
+ self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ }
}
// Re-export key types for convenience
diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs
b/qdp/qdp-core/tests/arrow_ipc_io.rs
new file mode 100644
index 000000000..6ef206954
--- /dev/null
+++ b/qdp/qdp-core/tests/arrow_ipc_io.rs
@@ -0,0 +1,239 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch};
+use arrow::array::{Float64Array, FixedSizeListArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::writer::FileWriter as ArrowFileWriter;
+use std::fs::{self, File};
+use std::sync::Arc;
+
+mod common;
+
+#[test]
+fn test_read_arrow_ipc_fixed_size_list() {
+ let temp_path = "/tmp/test_arrow_ipc_fixed.arrow";
+ let num_samples = 10;
+ let sample_size = 16;
+
+ // Create test data
+ let mut all_values = Vec::new();
+ for i in 0..num_samples {
+ for j in 0..sample_size {
+ all_values.push((i * sample_size + j) as f64);
+ }
+ }
+
+ // Write Arrow IPC with FixedSizeList format
+ let values_array = Float64Array::from(all_values.clone());
+ let field = Arc::new(Field::new("item", DataType::Float64, false));
+ let list_array = FixedSizeListArray::new(
+ field,
+ sample_size as i32,
+ Arc::new(values_array),
+ None,
+ );
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::FixedSizeList(
+ Arc::new(Field::new("item", DataType::Float64, false)),
+ sample_size as i32,
+ ),
+ false,
+ )]));
+
+ let batch = arrow::record_batch::RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list_array)],
+ )
+ .unwrap();
+
+ let file = File::create(temp_path).unwrap();
+ let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+
+ // Read and verify
+ let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+ assert_eq!(samples, num_samples);
+ assert_eq!(size, sample_size);
+ assert_eq!(data.len(), num_samples * sample_size);
+
+ for (i, &val) in data.iter().enumerate() {
+ assert_eq!(val, i as f64);
+ }
+
+ // Cleanup
+ fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_read_arrow_ipc_list() {
+ let temp_path = "/tmp/test_arrow_ipc_list.arrow";
+ let num_samples = 5;
+ let sample_size = 8;
+
+ // Create test data with List format
+ let mut list_builder =
arrow::array::ListBuilder::new(Float64Array::builder(num_samples *
sample_size));
+
+ for i in 0..num_samples {
+ let values: Vec<f64> = (0..sample_size).map(|j| (i * sample_size + j)
as f64).collect();
+ list_builder.values().append_slice(&values);
+ list_builder.append(true);
+ }
+
+ let list_array = list_builder.finish();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+ false,
+ )]));
+
+ let batch = arrow::record_batch::RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list_array)],
+ )
+ .unwrap();
+
+ let file = File::create(temp_path).unwrap();
+ let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+
+ // Read and verify
+ let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+ assert_eq!(samples, num_samples);
+ assert_eq!(size, sample_size);
+ assert_eq!(data.len(), num_samples * sample_size);
+
+ for (i, &val) in data.iter().enumerate() {
+ assert_eq!(val, i as f64);
+ }
+
+ // Cleanup
+ fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_arrow_ipc_inconsistent_sizes_fails() {
+ let temp_path = "/tmp/test_arrow_ipc_inconsistent.arrow";
+
+ // Create data with inconsistent sample sizes
+ let mut list_builder =
arrow::array::ListBuilder::new(Float64Array::builder(20));
+
+ // First sample: 4 elements
+ list_builder.values().append_slice(&[1.0, 2.0, 3.0, 4.0]);
+ list_builder.append(true);
+
+ // Second sample: 8 elements (inconsistent!)
+ list_builder.values().append_slice(&[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0,
12.0]);
+ list_builder.append(true);
+
+ let list_array = list_builder.finish();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+ false,
+ )]));
+
+ let batch = arrow::record_batch::RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list_array)],
+ )
+ .unwrap();
+
+ let file = File::create(temp_path).unwrap();
+ let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+
+ // Should fail due to inconsistent sizes
+ let result = read_arrow_ipc_batch(temp_path);
+ assert!(result.is_err());
+
+ // Cleanup
+ fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_arrow_ipc_empty_file_fails() {
+ let result =
read_arrow_ipc_batch("/tmp/nonexistent_arrow_file_12345.arrow");
+ assert!(result.is_err());
+}
+
+#[test]
+fn test_arrow_ipc_large_batch() {
+ let temp_path = "/tmp/test_arrow_ipc_large.arrow";
+ let num_samples = 100;
+ let sample_size = 64;
+
+ // Create large dataset
+ let mut all_values = Vec::with_capacity(num_samples * sample_size);
+ for i in 0..num_samples {
+ for j in 0..sample_size {
+ all_values.push((i * sample_size + j) as f64 / (num_samples *
sample_size) as f64);
+ }
+ }
+
+ // Write as FixedSizeList
+ let values_array = Float64Array::from(all_values.clone());
+ let field = Arc::new(Field::new("item", DataType::Float64, false));
+ let list_array = FixedSizeListArray::new(
+ field,
+ sample_size as i32,
+ Arc::new(values_array),
+ None,
+ );
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::FixedSizeList(
+ Arc::new(Field::new("item", DataType::Float64, false)),
+ sample_size as i32,
+ ),
+ false,
+ )]));
+
+ let batch = arrow::record_batch::RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list_array)],
+ )
+ .unwrap();
+
+ let file = File::create(temp_path).unwrap();
+ let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+
+ // Read and verify
+ let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+ assert_eq!(samples, num_samples);
+ assert_eq!(size, sample_size);
+ assert_eq!(data.len(), all_values.len());
+
+ for i in 0..data.len() {
+ assert!((data[i] - all_values[i]).abs() < 1e-10);
+ }
+
+ // Cleanup
+ fs::remove_file(temp_path).unwrap();
+}
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index cd3fcc3ac..340aae814 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -181,13 +181,7 @@ impl QdpEngine {
})
}
- /// Encode from Parquet file (FASTEST - recommended for batches)
- ///
- /// Direct Parquet→GPU pipeline:
- /// - Reads List<Float64> column format using Arrow
- /// - Zero-copy data extraction
- /// - Single optimized batch kernel launch
- /// - Returns batched tensor (shape: [num_samples, 2^num_qubits])
+ /// Encode from Parquet file
///
/// Args:
/// path: Path to Parquet file
@@ -209,6 +203,29 @@ impl QdpEngine {
consumed: false,
})
}
+
+ /// Encode from Arrow IPC file
+ ///
+ /// Args:
+ /// path: Path to Arrow IPC file (.arrow or .feather)
+ /// num_qubits: Number of qubits for encoding
+ /// encoding_method: Encoding strategy (currently only "amplitude")
+ ///
+ /// Returns:
+ /// QuantumTensor: DLPack tensor containing all encoded states
+ ///
+ /// Example:
+ /// >>> engine = QdpEngine(device_id=0)
+ /// >>> batched = engine.encode_from_arrow_ipc("data.arrow", 16,
"amplitude")
+ /// >>> torch_tensor = torch.from_dlpack(batched)
+ fn encode_from_arrow_ipc(&self, path: &str, num_qubits: usize,
encoding_method: &str) -> PyResult<QuantumTensor> {
+ let ptr = self.engine.encode_from_arrow_ipc(path, num_qubits,
encoding_method)
+ .map_err(|e| PyRuntimeError::new_err(format!("Encoding from Arrow
IPC failed: {}", e)))?;
+ Ok(QuantumTensor {
+ ptr,
+ consumed: false,
+ })
+ }
}
/// Mahout QDP Python module