This is an automated email from the ASF dual-hosted git repository.
kevingurney pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b75373d0f GH-44923: [MATLAB] Add IPC `RecordBatchStreamReader` MATLAB
class (#45068)
8b75373d0f is described below
commit 8b75373d0f344a69057fedc38ac1ceafb9b6b734
Author: Kevin Gurney <[email protected]>
AuthorDate: Mon Dec 23 15:53:11 2024 -0500
GH-44923: [MATLAB] Add IPC `RecordBatchStreamReader` MATLAB class (#45068)
### Rationale for this change
To enable support for the IPC Streaming format in the MATLAB interface, we
should add a `RecordBatchStreamReader` class.
This is a followup to #44922
### What changes are included in this PR?
1. Added a new `arrow.io.ipc.RecordBatchStreamReader` MATLAB class.
### Are these changes tested?
Yes.
1. Added new MATLAB test suite
`arrow/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m`.
### Are there any user-facing changes?
Yes.
1. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects to
read `RecordBatch` objects incrementally from an Arrow IPC Stream file.
### Notes
1. Thank you @ sgilmore10 for your help with this pull request!
* GitHub Issue: #44923
Lead-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Sarah Gilmore <[email protected]>
Signed-off-by: Kevin Gurney <[email protected]>
---
matlab/src/cpp/arrow/matlab/error/error.h | 2 +
.../io/ipc/proxy/record_batch_stream_reader.cc | 154 ++++++++++
.../io/ipc/proxy/record_batch_stream_reader.h | 44 +++
matlab/src/cpp/arrow/matlab/proxy/factory.cc | 2 +
.../+arrow/+io/+ipc/RecordBatchStreamReader.m | 83 +++++
.../test/arrow/io/ipc/tRecordBatchStreamReader.m | 336 +++++++++++++++++++++
matlab/tools/cmake/BuildMatlabArrowInterface.cmake | 1 +
7 files changed, 622 insertions(+)
diff --git a/matlab/src/cpp/arrow/matlab/error/error.h
b/matlab/src/cpp/arrow/matlab/error/error.h
index e5a5df6f4b..425e089d9f 100644
--- a/matlab/src/cpp/arrow/matlab/error/error.h
+++ b/matlab/src/cpp/arrow/matlab/error/error.h
@@ -249,5 +249,7 @@ static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
"arrow:io:ipc:FailedToOpenRecordBatchReader";
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX =
"arrow:io:ipc:InvalidIndex";
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
+static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";
+static const char* IPC_END_OF_STREAM = "arrow:io:ipc:EndOfStream";
} // namespace arrow::matlab::error
diff --git
a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc
b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc
new file mode 100644
index 0000000000..f3c833484d
--- /dev/null
+++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
+#include "arrow/io/file.h"
+#include "arrow/matlab/error/error.h"
+#include "arrow/matlab/tabular/proxy/record_batch.h"
+#include "arrow/matlab/tabular/proxy/schema.h"
+#include "arrow/matlab/tabular/proxy/table.h"
+#include "arrow/util/utf8.h"
+
+#include "libmexclass/proxy/ProxyManager.h"
+
+namespace arrow::matlab::io::ipc::proxy {
+
+RecordBatchStreamReader::RecordBatchStreamReader(
+ const std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader)
+ : reader{std::move(reader)} {
+ REGISTER_METHOD(RecordBatchStreamReader, getSchema);
+ REGISTER_METHOD(RecordBatchStreamReader, readRecordBatch);
+ REGISTER_METHOD(RecordBatchStreamReader, hasNextRecordBatch);
+ REGISTER_METHOD(RecordBatchStreamReader, readTable);
+}
+
+libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
+ const libmexclass::proxy::FunctionArguments& constructor_arguments) {
+ namespace mda = ::matlab::data;
+ using RecordBatchStreamReaderProxy =
+ arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;
+
+ const mda::StructArray opts = constructor_arguments[0];
+
+ const mda::StringArray filename_mda = opts[0]["Filename"];
+ const auto filename_utf16 = std::u16string(filename_mda[0]);
+ MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
+ arrow::util::UTF16StringToUTF8(filename_utf16),
+ error::UNICODE_CONVERSION_ERROR_ID);
+
+ MATLAB_ASSIGN_OR_ERROR(auto input_stream,
arrow::io::ReadableFile::Open(filename_utf8),
+ error::FAILED_TO_OPEN_FILE_FOR_READ);
+
+ MATLAB_ASSIGN_OR_ERROR(auto reader,
+
arrow::ipc::RecordBatchStreamReader::Open(input_stream),
+ error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
+
+ return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
+}
+
+void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context&
context) {
+ namespace mda = ::matlab::data;
+ using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
+
+ auto schema = reader->schema();
+
+ auto schema_proxy = std::make_shared<SchemaProxy>(std::move(schema));
+ const auto schema_proxy_id =
+ libmexclass::proxy::ProxyManager::manageProxy(schema_proxy);
+
+ mda::ArrayFactory factory;
+ const auto schema_proxy_id_mda = factory.createScalar(schema_proxy_id);
+ context.outputs[0] = schema_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::readTable(libmexclass::proxy::method::Context&
context) {
+ namespace mda = ::matlab::data;
+ using TableProxy = arrow::matlab::tabular::proxy::Table;
+
+ MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context,
+ error::IPC_TABLE_READ_FAILED);
+ auto table_proxy = std::make_shared<TableProxy>(table);
+ const auto table_proxy_id =
libmexclass::proxy::ProxyManager::manageProxy(table_proxy);
+
+ mda::ArrayFactory factory;
+ const auto table_proxy_id_mda = factory.createScalar(table_proxy_id);
+ context.outputs[0] = table_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::readRecordBatch(
+ libmexclass::proxy::method::Context& context) {
+ namespace mda = ::matlab::data;
+ using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch;
+ using namespace libmexclass::error;
+ // If we don't have a "pre-cached" record batch to return, then try reading
another
+ // record batch from the IPC Stream. If there are no more record batches in
the stream,
+ // then error.
+ if (!nextRecordBatch) {
+ MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(nextRecordBatch, reader->Next(),
context,
+ error::IPC_RECORD_BATCH_READ_FAILED);
+ }
+ // Even if the read was "successful", the resulting record batch may be
empty,
+ // signaling the end of the stream.
+ if (!nextRecordBatch) {
+ context.error =
+ Error{error::IPC_END_OF_STREAM,
+ "Reached end of Arrow IPC Stream. No more record batches to
read."};
+ return;
+ }
+ auto record_batch_proxy =
std::make_shared<RecordBatchProxy>(nextRecordBatch);
+ const auto record_batch_proxy_id =
+ libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);
+ // Once we have "consumed" the next RecordBatch, set nextRecordBatch to
nullptr
+ // so that the next call to hasNextRecordBatch correctly checks whether
there are more
+ // record batches remaining in the IPC Stream.
+ nextRecordBatch = nullptr;
+ mda::ArrayFactory factory;
+ const auto record_batch_proxy_id_mda =
factory.createScalar(record_batch_proxy_id);
+ context.outputs[0] = record_batch_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::hasNextRecordBatch(
+ libmexclass::proxy::method::Context& context) {
+ namespace mda = ::matlab::data;
+ bool has_next_record_batch = true;
+ if (!nextRecordBatch) {
+ // Try to read another RecordBatch from the
+ // IPC Stream.
+ auto maybe_record_batch = reader->Next();
+ if (!maybe_record_batch.ok()) {
+ has_next_record_batch = false;
+ } else {
+ // If we read a RecordBatch successfully,
+ // then "cache" the RecordBatch
+ // so that we can return it on the next
+ // call to readRecordBatch.
+ nextRecordBatch = *maybe_record_batch;
+
+ // Even if the read was "successful", the resulting
+ // record batch may be empty, signaling that
+ // the end of the IPC stream has been reached.
+ if (!nextRecordBatch) {
+ has_next_record_batch = false;
+ }
+ }
+ }
+
+ mda::ArrayFactory factory;
+ context.outputs[0] = factory.createScalar(has_next_record_batch);
+}
+
+} // namespace arrow::matlab::io::ipc::proxy
diff --git
a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h
b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h
new file mode 100644
index 0000000000..56fb293987
--- /dev/null
+++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/ipc/reader.h"
+#include "libmexclass/proxy/Proxy.h"
+
+namespace arrow::matlab::io::ipc::proxy {
+
+class RecordBatchStreamReader : public libmexclass::proxy::Proxy {
+ public:
+ RecordBatchStreamReader(std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
reader);
+
+ ~RecordBatchStreamReader() = default;
+
+ static libmexclass::proxy::MakeResult make(
+ const libmexclass::proxy::FunctionArguments& constructor_arguments);
+
+ protected:
+ std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;
+ std::shared_ptr<arrow::RecordBatch> nextRecordBatch;
+
+ void getSchema(libmexclass::proxy::method::Context& context);
+ void readRecordBatch(libmexclass::proxy::method::Context& context);
+ void hasNextRecordBatch(libmexclass::proxy::method::Context& context);
+ void readTable(libmexclass::proxy::method::Context& context);
+};
+
+} // namespace arrow::matlab::io::ipc::proxy
diff --git a/matlab/src/cpp/arrow/matlab/proxy/factory.cc
b/matlab/src/cpp/arrow/matlab/proxy/factory.cc
index a08a7495c0..902546fd05 100644
--- a/matlab/src/cpp/arrow/matlab/proxy/factory.cc
+++ b/matlab/src/cpp/arrow/matlab/proxy/factory.cc
@@ -36,6 +36,7 @@
#include "arrow/matlab/io/feather/proxy/writer.h"
#include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
#include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
+#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
#include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/schema.h"
@@ -113,6 +114,7 @@ libmexclass::proxy::MakeResult Factory::make_proxy(
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader ,
arrow::matlab::io::ipc::proxy::RecordBatchFileReader);
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter ,
arrow::matlab::io::ipc::proxy::RecordBatchFileWriter);
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamWriter ,
arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter);
+ REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamReader ,
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader);
// clang-format on
diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
new file mode 100644
index 0000000000..60ca38eba9
--- /dev/null
+++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
@@ -0,0 +1,83 @@
+%RECORDBATCHSTREAMREADER Class for reading Arrow record batches from the
+% Arrow IPC Stream format.
+
+% Licensed to the Apache Software Foundation (ASF) under one or more
+% contributor license agreements. See the NOTICE file distributed with
+% this work for additional information regarding copyright ownership.
+% The ASF licenses this file to you under the Apache License, Version
+% 2.0 (the "License"); you may not use this file except in compliance
+% with the License. You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+% implied. See the License for the specific language governing
+% permissions and limitations under the License.
+
+classdef RecordBatchStreamReader < matlab.mixin.Scalar
+
+ properties(SetAccess=private, GetAccess=public, Hidden)
+ Proxy
+ end
+
+ properties (Dependent, SetAccess=private, GetAccess=public)
+ Schema
+ end
+
+ methods
+ function obj = RecordBatchStreamReader(filename)
+ arguments
+ filename(1, 1) string {mustBeNonzeroLengthText}
+ end
+ args = struct(Filename=filename);
+ proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
+ obj.Proxy = arrow.internal.proxy.create(proxyName, args);
+ end
+
+ function schema = get.Schema(obj)
+ proxyID = obj.Proxy.getSchema();
+ proxyName = "arrow.tabular.proxy.Schema";
+ proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+ schema = arrow.tabular.Schema(proxy);
+ end
+
+ function tf = hasnext(obj)
+ tf = obj.Proxy.hasNextRecordBatch();
+ end
+
+ function tf = done(obj)
+ tf = ~obj.Proxy.hasNextRecordBatch();
+ end
+
+ function arrowRecordBatch = read(obj)
+ % NOTE: This function is a "convenience alias" for the
readRecordBatch
+ % method, which has a longer name. This is the exact same
implementation
+ % as readRecordBatch. Since this method might be called in a tight
loop,
+ % it should be slightly more efficient to call the C++ code
directly,
+ % rather than invoking obj.readRecordBatch indirectly. We are
intentionally
+ % trading off code duplication for performance here.
+ proxyID = obj.Proxy.readRecordBatch();
+ proxyName = "arrow.tabular.proxy.RecordBatch";
+ proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+ arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
+ end
+
+ function arrowRecordBatch = readRecordBatch(obj)
+ proxyID = obj.Proxy.readRecordBatch();
+ proxyName = "arrow.tabular.proxy.RecordBatch";
+ proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+ arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
+ end
+
+ function arrowTable = readTable(obj)
+ proxyID = obj.Proxy.readTable();
+ proxyName = "arrow.tabular.proxy.Table";
+ proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+ arrowTable = arrow.tabular.Table(proxy);
+ end
+
+ end
+
+end
diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m
b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m
new file mode 100644
index 0000000000..6ca6719773
--- /dev/null
+++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m
@@ -0,0 +1,336 @@
+%TRECORDBATCHSTREAMREADER Unit tests for arrow.io.ipc.RecordBatchStreamReader.
+
+% Licensed to the Apache Software Foundation (ASF) under one or more
+% contributor license agreements. See the NOTICE file distributed with
+% this work for additional information regarding copyright ownership.
+% The ASF licenses this file to you under the Apache License, Version
+% 2.0 (the "License"); you may not use this file except in compliance
+% with the License. You may obtain a copy of the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+% implied. See the License for the specific language governing
+% permissions and limitations under the License.
+classdef tRecordBatchStreamReader < matlab.unittest.TestCase
+
+ properties
+ DataFolder
+ ZeroBatchStreamFile
+ OneBatchStreamFile
+ MultipleBatchStreamFile
+ RandomAccessFile
+ end
+
+ properties (TestParameter)
+ RecordBatchReadFcn = {@read, @readRecordBatch}
+ end
+
+ methods(TestClassSetup)
+
+ function setupDataFolder(testCase)
+ import matlab.unittest.fixtures.TemporaryFolderFixture
+ fixture = testCase.applyFixture(TemporaryFolderFixture);
+ testCase.DataFolder = string(fixture.Folder);
+ end
+
+ function setupRandomAccessFile(testCase)
+ fieldA = arrow.field("A", arrow.string());
+ fieldB = arrow.field("B", arrow.float32());
+ schema = arrow.schema([fieldA, fieldB]);
+ fname = fullfile(testCase.DataFolder, "RandomAccessFile.arrow");
+ writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema);
+ writer.close();
+ testCase.RandomAccessFile = fname;
+ end
+
+ function setupZeroBatchStreamFile(testCase)
+ fieldA = arrow.field("A", arrow.string());
+ fieldB = arrow.field("B", arrow.float32());
+ schema = arrow.schema([fieldA, fieldB]);
+ fname = fullfile(testCase.DataFolder,
"ZeroBatchStreamFile.arrows");
+ writer = arrow.io.ipc.RecordBatchStreamWriter(fname, schema);
+ writer.close();
+ testCase.ZeroBatchStreamFile = fname;
+ end
+
+ function setupOneBatchStreamFile(testCase)
+ t = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A",
"B"]);
+ recordBatch = arrow.recordBatch(t);
+ fname = fullfile(testCase.DataFolder, "OneBatchFile.arrows");
+ writer = arrow.io.ipc.RecordBatchStreamWriter(fname,
recordBatch.Schema);
+ writer.writeRecordBatch(recordBatch);
+ writer.close();
+ testCase.OneBatchStreamFile = fname;
+ end
+
+ function setupMultipleBatchStreamFile(testCase)
+ t1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A",
"B"]);
+ t2 = table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A",
"B"]);
+ recordBatch1 = arrow.recordBatch(t1);
+ recordBatch2 = arrow.recordBatch(t2);
+ fname = fullfile(testCase.DataFolder,
"MultipleBatchStreamFile.arrows");
+ writer = arrow.io.ipc.RecordBatchStreamWriter(fname,
recordBatch1.Schema);
+ writer.writeRecordBatch(recordBatch1);
+ writer.writeRecordBatch(recordBatch2);
+ writer.close();
+ testCase.MultipleBatchStreamFile = fname;
+ end
+ end
+
+ methods (Test)
+
+ function ZeroLengthFilenameError(testCase)
+ % Verify RecordBatchStreamReader throws an exception with the
+ % identifier MATLAB:validators:mustBeNonzeroLengthText if the
+ % filename input argument given is a zero length string.
+ fcn = @() arrow.io.ipc.RecordBatchStreamReader("");
+ testCase.verifyError(fcn,
"MATLAB:validators:mustBeNonzeroLengthText");
+ end
+
+ function MissingStringFilenameError(testCase)
+ % Verify RecordBatchStreamReader throws an exception with the
+ % identifier MATLAB:validators:mustBeNonzeroLengthText if the
+ % filename input argument given is a missing string.
+ fcn = @() arrow.io.ipc.RecordBatchStreamReader(string(missing));
+ testCase.verifyError(fcn,
"MATLAB:validators:mustBeNonzeroLengthText");
+ end
+
+ function FilenameInvalidTypeError(testCase)
+ % Verify RecordBatchStreamReader throws an exception with the
+ % identifier MATLAB:validators:UnableToConvert if the filename
+ % input argument is neither a scalar string nor a char vector.
+ fcn = @() arrow.io.ipc.RecordBatchStreamReader(table);
+ testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert");
+ end
+
+ function Schema(testCase)
+ % Verify the getter method for Schema returns the
+ % expected value.
+ fieldA = arrow.field("A", arrow.string());
+ fieldB = arrow.field("B", arrow.float32());
+ expectedSchema = arrow.schema([fieldA fieldB]);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ testCase.verifyEqual(reader.Schema, expectedSchema);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+ testCase.verifyEqual(reader.Schema, expectedSchema);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+ testCase.verifyEqual(reader.Schema, expectedSchema);
+ end
+
+ function SchemaNoSetter(testCase)
+ % Verify the Schema property is not settable.
+ fieldC = arrow.field("C", arrow.date32());
+ schema = arrow.schema(fieldC);
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ testCase.verifyError(@() setfield(reader, "Schema", schema),
"MATLAB:class:SetProhibited");
+ end
+
+ function ReadErrorIfEndOfStream(testCase, RecordBatchReadFcn)
+ % Verify read throws an execption with the identifier
arrow:io:ipc:EndOfStream
+ % on an Arrow IPC Stream file containing zero batches.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ fcn = @() RecordBatchReadFcn(reader);
+ testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+ end
+
+ function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn)
+ % Verify read can successfully read an Arrow IPC Stream file
+ % containing one batch.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+
+ expectedMatlabTable = table(["Row1"; "Row2"], single([1; 2]),
VariableNames=["A", "B"]);
+ expected = arrow.recordBatch(expectedMatlabTable);
+ actual = RecordBatchReadFcn(reader);
+ testCase.verifyEqual(actual, expected);
+
+ fcn = @() RecordBatchReadFcn(reader);
+ testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+ end
+
+ function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn)
+ % Verify read can successfully read an Arrow IPC Stream file
+ % containing mulitple batches.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+ expectedMatlabTable1 = table(["Row1"; "Row2"], single([1; 2]),
VariableNames=["A", "B"]);
+ expected1 = arrow.recordBatch(expectedMatlabTable1);
+ actual1 = RecordBatchReadFcn(reader);
+ testCase.verifyEqual(actual1, expected1);
+
+ expectedMatlabTable2 = table(["Row3"; "Row4"], single([3; 4]),
VariableNames=["A", "B"]);
+ expected2 = arrow.recordBatch(expectedMatlabTable2);
+ actual2 = RecordBatchReadFcn(reader);
+ testCase.verifyEqual(actual2, expected2);
+
+ fcn = @() RecordBatchReadFcn(reader);
+ testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+ end
+
+ function HasNext(testCase, RecordBatchReadFcn)
+ % Verify that the hasnext method returns true the correct
+ % number of times depending on the number of record
+ % batches in an Arrow IPC Stream format.
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ % hasnext should return true 0 times for a 0 batch file.
+ iterations = 0;
+ while reader.hasnext()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 0);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+ % hasnext should return true 1 time for a 1 batch file.
+ iterations = 0;
+ while reader.hasnext()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 1);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+ % hasnext should return true 2 times for a 2 batch file.
+ iterations = 0;
+ while reader.hasnext()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 2);
+ end
+
+ function Done(testCase, RecordBatchReadFcn)
+ % Verify that the done method returns false the correct
+ % number of times depending on the number of record
+ % batches in an Arrow IPC Stream format.
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ % done should return false 0 times for a 0 batch file.
+ iterations = 0;
+ while ~reader.done()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 0);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+ % done should return false 1 time for a 1 batch file.
+ iterations = 0;
+ while ~reader.done()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 1);
+
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+ % done should return false 2 times for a 2 batch file.
+ iterations = 0;
+ while ~reader.done()
+ RecordBatchReadFcn(reader);
+ iterations = iterations + 1;
+ end
+ testCase.verifyEqual(iterations, 2);
+ end
+
+ function ReadTableZeroBatchStreamFile(testCase)
+ % Verify read can successfully read an Arrow IPC Stream file
+ % containing zero batches as an arrow.tabular.Table.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+ matlabTable = table('Size', [0, 2], 'VariableTypes', ["string",
"single"], 'VariableNames', ["A", "B"]);
+ expected = arrow.table(matlabTable);
+ actual = reader.readTable();
+ testCase.verifyEqual(actual, expected);
+ end
+
+ function ReadTableOneBatchStreamFile(testCase)
+ % Verify read can successfully read an Arrow IPC Stream file
+ % containing one batch as an arrow.tabular.Table.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+ matlabTable = table(["Row1"; "Row2"], single([1; 2]),
VariableNames=["A", "B"]);
+ expected = arrow.table(matlabTable);
+ actual = reader.readTable();
+ testCase.verifyEqual(actual, expected);
+ end
+
+ function ReadTableMultipleBatchStreamFile(testCase)
+ % Verify read can successfully read an Arrow IPC Stream file
+ % containing multiple batches as an arrow.tabular.Table.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+ matlabTable = table(["Row1"; "Row2"; "Row3"; "Row4"], single([1;
2; 3; 4]), VariableNames=["A", "B"]);
+ expected = arrow.table(matlabTable);
+ actual = reader.readTable();
+ testCase.verifyEqual(actual, expected);
+ end
+
+ function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn)
+ % Verify readTable returns only the remaining record batches
+ % in an Arrow IPC Stream file after calling readRecordBatch first.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+ testCase.verifyTrue(reader.hasnext());
+ testCase.verifyFalse(reader.done());
+
+ expectedRecordBatch = arrow.recordBatch(...
+ table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A",
"B"]) ...
+ );
+ actualRecordBatch = RecordBatchReadFcn(reader);
+ testCase.verifyEqual(actualRecordBatch, expectedRecordBatch);
+
+ expectedTable = arrow.table(...
+ table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A",
"B"]) ...
+ );
+ actualTable = reader.readTable();
+ testCase.verifyEqual(actualTable, expectedTable);
+
+ testCase.verifyFalse(reader.hasnext());
+ testCase.verifyTrue(reader.done());
+ end
+
+ function ReadTableMultipleCalls(testCase)
+ % Verify readTable returns an empty table if it is called
+ % multiple times in a row.
+ reader =
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+ expected = arrow.table(...
+ table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]),
VariableNames=["A", "B"]) ...
+ );
+ actual = reader.readTable();
+ testCase.verifyEqual(actual, expected);
+
+ testCase.verifyFalse(reader.hasnext());
+ testCase.verifyTrue(reader.done());
+
+ expectedEmpty = arrow.table(...
+ table('Size', [0, 2], 'VariableTypes', ["string", "single"],
'VariableNames', ["A", "B"]) ...
+ );
+
+ actualEmpty = reader.readTable();
+ testCase.verifyEqual(actualEmpty, expectedEmpty);
+
+ testCase.verifyFalse(reader.hasnext());
+ testCase.verifyTrue(reader.done());
+
+ actualEmpty = reader.readTable();
+ testCase.verifyEqual(actualEmpty, expectedEmpty);
+
+ testCase.verifyFalse(reader.hasnext());
+ testCase.verifyTrue(reader.done());
+ end
+
+ function ErrorIfNotIpcStreamFile(testCase)
+ % Verify RecordBatchStreamReader throws an exception with the
+ % identifier arrow:io:ipc:FailedToOpenRecordBatchReader if
+ % the provided file is not an Arrow IPC Stream file.
+ fcn = @()
arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile);
+ testCase.verifyError(fcn,
"arrow:io:ipc:FailedToOpenRecordBatchReader");
+ end
+
+ end
+
+end
diff --git a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
index 29a737a6ec..27af19676b 100644
--- a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
+++ b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
@@ -83,6 +83,7 @@ set(MATLAB_ARROW_LIBMEXCLASS_CLIENT_PROXY_SOURCES
"${CMAKE_SOURCE_DIR}/src/cpp/a
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_reader.cc"
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc"
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc"
+
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc"
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc")