This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new fc4e2c3 ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces for column access fc4e2c3 is described below commit fc4e2c36d2c56a8bd5d1ab17eeb406826924d3e5 Author: Wes McKinney <wes.mckin...@twosigma.com> AuthorDate: Tue Nov 21 19:01:48 2017 -0500 ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces for column access While this will cause some minor API breakage in parquet-cpp and some other downstream users, this is reasonably long overdue. It will permit implementations of the RecordBatch or Table interface that do lazy IO / data loading or lazy materialization of columns. I will write a patch to fix up parquet-cpp, and will look to see if glib is easy to fix. There's no good way to go about merging this patch since a green build is not possible, so once we're happy with the patch, I can merge this patch and then work on getting a green build in parquet-cpp so we don't have a broken build there for too long Author: Wes McKinney <wes.mckin...@twosigma.com> Author: Kouhei Sutou <k...@clear-code.com> Closes #1337 from wesm/ARROW-1808 and squashes the following commits: 55ce663d [Wes McKinney] clang-format 57923708 [Wes McKinney] Revert to debug build for now 06625301 [Wes McKinney] Fix test case that wasn't being run in debug builds c9a7cb83 [Kouhei Sutou] Apply patch to fix Glib test suite 6b02e438 [Wes McKinney] Move DCHECK in SimpleRecordBatch ctor to Validate 06bced1a [Wes McKinney] Deprecate arrow::MakeTable 498bb082 [Wes McKinney] Fix glib compilation a4e403c3 [Wes McKinney] Set build warning level via environment variable 9a61beb5 [Wes McKinney] Move boxed_columns_ to SimpleTable. Remove DecimalType backwards compat define 7691a5f1 [Wes McKinney] Make Table virtual also, refactor bcc0cd15 [Wes McKinney] Some table refactoring to be virtual 267dd218 [Wes McKinney] Finish RecordBatch refactoring, get tests passing again. Add option to set build type in Travis CI ef00e5b9 [Wes McKinney] Split off record batch implementation into separate files, progress refactoring 33f341df [Wes McKinney] Start making things virtual --- .travis.yml | 2 + c_glib/arrow-glib/record-batch.cpp | 5 +- c_glib/arrow-glib/table.cpp | 3 +- c_glib/test/test-file-writer.rb | 16 +- c_glib/test/test-gio-input-stream.rb | 18 +- c_glib/test/test-gio-output-stream.rb | 18 +- c_glib/test/test-stream-writer.rb | 18 +- ci/travis_before_script_cpp.sh | 6 +- ci/travis_env_common.sh | 3 + ci/travis_script_python.sh | 3 + cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/api.h | 1 + cpp/src/arrow/array.h | 2 + cpp/src/arrow/builder.cc | 1 - cpp/src/arrow/builder.h | 1 - cpp/src/arrow/column-benchmark.cc | 1 + cpp/src/arrow/compute/kernel.h | 1 + cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 2 +- cpp/src/arrow/ipc/feather-test.cc | 5 +- cpp/src/arrow/ipc/feather.cc | 1 + cpp/src/arrow/ipc/ipc-json-test.cc | 4 +- cpp/src/arrow/ipc/ipc-read-write-benchmark.cc | 2 +- cpp/src/arrow/ipc/ipc-read-write-test.cc | 19 +- cpp/src/arrow/ipc/json-integration-test.cc | 2 +- cpp/src/arrow/ipc/json-internal.cc | 8 +- cpp/src/arrow/ipc/json.cc | 2 +- cpp/src/arrow/ipc/reader.cc | 4 +- cpp/src/arrow/ipc/reader.h | 3 +- cpp/src/arrow/ipc/test-common.h | 44 ++-- cpp/src/arrow/ipc/writer.cc | 10 +- cpp/src/arrow/pretty_print.cc | 2 +- cpp/src/arrow/python/python-test.cc | 5 +- cpp/src/arrow/python/python_to_arrow.cc | 8 +- cpp/src/arrow/record_batch.cc | 206 +++++++++++++++++ cpp/src/arrow/record_batch.h | 154 +++++++++++++ cpp/src/arrow/table-test.cc | 177 +++++++-------- cpp/src/arrow/table.cc | 315 +++++++++----------------- cpp/src/arrow/table.h | 165 +++----------- cpp/src/arrow/table_builder-test.cc | 5 +- cpp/src/arrow/table_builder.cc | 3 +- cpp/src/arrow/test-common.h | 1 - cpp/src/arrow/test-util.h | 3 +- cpp/src/arrow/type.h | 11 +- python/pyarrow/includes/libarrow.pxd | 11 +- python/pyarrow/table.pxi | 8 +- 45 files changed, 725 insertions(+), 556 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9c714a6..ddadf73 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,6 +55,7 @@ matrix: - export ARROW_TRAVIS_VALGRIND=1 - export ARROW_TRAVIS_PLASMA=1 - export ARROW_TRAVIS_CLANG_FORMAT=1 + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - export CC="clang-4.0" - export CXX="clang++-4.0" - $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh @@ -74,6 +75,7 @@ matrix: before_script: - export ARROW_TRAVIS_USE_TOOLCHAIN=1 - export ARROW_TRAVIS_PLASMA=1 + - export ARROW_BUILD_WARNING_LEVEL=CHECKIN - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp index f381af0..f23a0cf 100644 --- a/c_glib/arrow-glib/record-batch.cpp +++ b/c_glib/arrow-glib/record-batch.cpp @@ -150,9 +150,8 @@ garrow_record_batch_new(GArrowSchema *schema, } auto arrow_record_batch = - std::make_shared<arrow::RecordBatch>(garrow_schema_get_raw(schema), - n_rows, - arrow_columns); + arrow::RecordBatch::Make(garrow_schema_get_raw(schema), + n_rows, arrow_columns); return garrow_record_batch_new_raw(&arrow_record_batch); } diff --git a/c_glib/arrow-glib/table.cpp b/c_glib/arrow-glib/table.cpp index 779f2ef..e086396 100644 --- a/c_glib/arrow-glib/table.cpp +++ b/c_glib/arrow-glib/table.cpp @@ -143,8 +143,7 @@ garrow_table_new(GArrowSchema *schema, } auto arrow_table = - std::make_shared<arrow::Table>(garrow_schema_get_raw(schema), - arrow_columns); + arrow::Table::Make(garrow_schema_get_raw(schema), arrow_columns); return garrow_table_new_raw(&arrow_table); } diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb index 3de8e5c..67aed85 100644 --- a/c_glib/test/test-file-writer.rb +++ b/c_glib/test/test-file-writer.rb @@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-file-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +42,12 @@ class TestFileWriter < Test::Unit::TestCase input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-input-stream.rb b/c_glib/test/test-gio-input-stream.rb index a71a370..2adf25b 100644 --- a/c_glib/test/test-gio-input-stream.rb +++ b/c_glib/test/test-gio-input-stream.rb @@ -16,15 +16,21 @@ # under the License. class TestGIOInputStream < Test::Unit::TestCase + include Helper::Buildable + def test_reader_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-input-stream") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ class TestGIOInputStream < Test::Unit::TestCase input = Arrow::GIOInputStream.new(input_stream) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-gio-output-stream.rb b/c_glib/test/test-gio-output-stream.rb index adaa8c1..c77598e 100644 --- a/c_glib/test/test-gio-output-stream.rb +++ b/c_glib/test/test-gio-output-stream.rb @@ -16,17 +16,23 @@ # under the License. class TestGIOOutputStream < Test::Unit::TestCase + include Helper::Buildable + def test_writer_backend + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-gio-output-stream") file = Gio::File.new_for_path(tempfile.path) output_stream = file.append_to(:none) output = Arrow::GIOOutputStream.new(output_stream) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) file_writer = Arrow::RecordBatchFileWriter.new(output, schema) begin - record_batch = Arrow::RecordBatch.new(schema, 0, []) + record_batch = Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]) file_writer.write_record_batch(record_batch) ensure file_writer.close @@ -38,8 +44,12 @@ class TestGIOOutputStream < Test::Unit::TestCase input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin file_reader = Arrow::RecordBatchFileReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], file_reader.schema.fields.collect(&:name)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + file_reader.read_record_batch(0)) ensure input.close end diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb index c3d0e14..32754e2 100644 --- a/c_glib/test/test-stream-writer.rb +++ b/c_glib/test/test-stream-writer.rb @@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase include Helper::Buildable def test_write_record_batch + data = [true] + field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) + schema = Arrow::Schema.new([field]) + tempfile = Tempfile.open("arrow-ipc-stream-writer") output = Arrow::FileOutputStream.new(tempfile.path, false) begin - field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new) - schema = Arrow::Schema.new([field]) stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema) begin columns = [ - build_boolean_array([true]), + build_boolean_array(data), ] - record_batch = Arrow::RecordBatch.new(schema, 1, columns) + record_batch = Arrow::RecordBatch.new(schema, data.size, columns) stream_writer.write_record_batch(record_batch) ensure stream_writer.close @@ -41,10 +43,12 @@ class TestStreamWriter < Test::Unit::TestCase input = Arrow::MemoryMappedInputStream.new(tempfile.path) begin stream_reader = Arrow::RecordBatchStreamReader.new(input) - assert_equal(["enabled"], + assert_equal([field.name], stream_reader.schema.fields.collect(&:name)) - assert_equal(true, - stream_reader.read_next.get_column(0).get_value(0)) + assert_equal(Arrow::RecordBatch.new(schema, + data.size, + [build_boolean_array(data)]), + stream_reader.read_next) assert_nil(stream_reader.read_next) ensure input.close diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 4998f19..664f7ce 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -91,12 +91,14 @@ fi if [ $TRAVIS_OS_NAME == "linux" ]; then cmake $CMAKE_COMMON_FLAGS \ $CMAKE_LINUX_FLAGS \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR else cmake $CMAKE_COMMON_FLAGS \ $CMAKE_OSX_FLAGS \ - -DBUILD_WARNING_LEVEL=CHECKIN \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ + -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \ $ARROW_CPP_DIR fi diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh index 52c7da4..21b6e26 100755 --- a/ci/travis_env_common.sh +++ b/ci/travis_env_common.sh @@ -38,6 +38,9 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env export CMAKE_EXPORT_COMPILE_COMMANDS=1 +export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug} +export ARROW_BUILD_WARNING_LEVEL=${ARROW_BUILD_WARNING_LEVEL:=Production} + if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then # C++ toolchain export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index 603201b..5f7b0a9 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -63,6 +63,7 @@ cmake -GNinja \ -DARROW_BUILD_UTILITIES=off \ -DARROW_PLASMA=on \ -DARROW_PYTHON=on \ + -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ $ARROW_CPP_DIR @@ -78,6 +79,8 @@ if [ "$PYTHON_VERSION" == "2.7" ]; then pip install futures fi +export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE + pip install -r requirements.txt python setup.py build_ext --with-parquet --with-plasma \ install --single-version-externally-managed --record=record.text diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 496e0da..9470578 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -22,6 +22,7 @@ set(ARROW_SRCS compare.cc memory_pool.cc pretty_print.cc + record_batch.cc status.cc table.cc table_builder.cc @@ -144,6 +145,7 @@ install(FILES compare.h memory_pool.h pretty_print.h + record_batch.h status.h table.h table_builder.h diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h index 5d2e859..7cae841 100644 --- a/cpp/src/arrow/api.h +++ b/cpp/src/arrow/api.h @@ -26,6 +26,7 @@ #include "arrow/compare.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 28756a6..dda9dd3 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -279,6 +279,8 @@ class ARROW_EXPORT Array { ARROW_DISALLOW_COPY_AND_ASSIGN(Array); }; +using ArrayVector = std::vector<std::shared_ptr<Array>>; + static inline std::ostream& operator<<(std::ostream& os, const Array& x) { os << x.ToString(); return os; diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 3e213fc..a42f902 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -28,7 +28,6 @@ #include "arrow/buffer.h" #include "arrow/compare.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 32741b5..e59e166 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -29,7 +29,6 @@ #include "arrow/buffer.h" #include "arrow/memory_pool.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" diff --git a/cpp/src/arrow/column-benchmark.cc b/cpp/src/arrow/column-benchmark.cc index e50ddf6..af2c368 100644 --- a/cpp/src/arrow/column-benchmark.cc +++ b/cpp/src/arrow/column-benchmark.cc @@ -19,6 +19,7 @@ #include "arrow/array.h" #include "arrow/memory_pool.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 7ff506c..0bfa55c 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -22,6 +22,7 @@ #include <vector> #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/util/macros.h" #include "arrow/util/variant.h" diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 022268e..a7262c8 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -27,8 +27,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/util/visibility.h" #include "arrow/gpu/cuda_context.h" diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index 6bd1646..e3de17f 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -29,6 +29,7 @@ #include "arrow/ipc/feather.h" #include "arrow/ipc/test-common.h" #include "arrow/pretty_print.h" +#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { @@ -376,8 +377,8 @@ TEST_F(TestTableWriter, TimeTypes) { schema->field(i)->type(), values->length(), buffers, values->null_count(), 0)); } - RecordBatch batch(schema, values->length(), std::move(arrays)); - CheckBatch(batch); + auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays)); + CheckBatch(*batch); } TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) { diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index cea720b..077dc39 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/feather-internal.h" #include "arrow/ipc/feather_generated.h" #include "arrow/ipc/util.h" // IWYU pragma: keep +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index a560f09..e496826 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -31,8 +31,8 @@ #include "arrow/ipc/json.h" #include "arrow/ipc/test-common.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/type_traits.h" @@ -269,7 +269,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { std::vector<std::shared_ptr<Array>> arrays; MakeBatchArrays(schema, num_rows, &arrays); - auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays); + auto batch = RecordBatch::Make(schema, num_rows, arrays); batches.push_back(batch); ASSERT_OK(writer->WriteRecordBatch(*batch)); } diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc index 9ed0abd..8561fb8 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc @@ -63,7 +63,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie } auto schema = std::make_shared<Schema>(fields); - return std::make_shared<RecordBatch>(schema, length, arrays); + return RecordBatch::Make(schema, length, arrays); } static void BM_WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 40cd3f0..1fcbdac 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -197,8 +197,8 @@ class IpcTestFixture : public io::MemoryMapFixture { std::vector<std::shared_ptr<Field>> fields = {f0}; auto schema = std::make_shared<Schema>(fields); - RecordBatch batch(schema, 0, {array}); - CheckRoundtrip(batch, buffer_size); + auto batch = RecordBatch::Make(schema, 0, {array}); + CheckRoundtrip(*batch, buffer_size); } protected: @@ -292,13 +292,13 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) { auto CheckArray = [this](const std::shared_ptr<Array>& array) { auto f0 = field("f0", array->type()); auto schema = ::arrow::schema({f0}); - RecordBatch batch(schema, array->length(), {array}); - auto sliced_batch = batch.Slice(0, 5); + auto batch = RecordBatch::Make(schema, array->length(), {array}); + auto sliced_batch = batch->Slice(0, 5); int64_t full_size; int64_t sliced_size; - ASSERT_OK(GetRecordBatchSize(batch, &full_size)); + ASSERT_OK(GetRecordBatchSize(*batch, &full_size)); ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size)); ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size; @@ -411,8 +411,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { *schema = ::arrow::schema({f0}); - std::vector<std::shared_ptr<Array>> arrays = {array}; - *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays); + *batch = RecordBatch::Make(*schema, batch_length, {array}); std::stringstream ss; ss << "test-write-past-max-recursion-" << g_file_number++; @@ -632,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { std::vector<std::shared_ptr<Field>> fields = {f0}; auto schema = std::make_shared<Schema>(fields); - RecordBatch batch(schema, length, {array}); + auto batch = RecordBatch::Make(schema, length, {array}); std::string path = "test-write-large-record_batch"; @@ -641,8 +640,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_)); std::shared_ptr<RecordBatch> result; - ASSERT_OK(DoLargeRoundTrip(batch, false, &result)); - CheckReadResult(*result, batch); + ASSERT_OK(DoLargeRoundTrip(*batch, false, &result)); + CheckReadResult(*result, *batch); ASSERT_EQ(length, result->num_rows()); } diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index c7530a4..f487487 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -34,8 +34,8 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index bdf1ef5..bfb3d28 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -28,8 +28,8 @@ #include "arrow/array.h" #include "arrow/builder.h" #include "arrow/ipc/dictionary.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -125,8 +125,8 @@ class SchemaWriter { // Make a dummy record batch. A bit tedious as we have to make a schema auto schema = ::arrow::schema({arrow::field("dictionary", dictionary->type())}); - RecordBatch batch(schema, dictionary->length(), {dictionary}); - RETURN_NOT_OK(WriteRecordBatch(batch, writer_)); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + RETURN_NOT_OK(WriteRecordBatch(*batch, writer_)); writer_->EndObject(); return Status::OK(); } @@ -1435,7 +1435,7 @@ Status ReadRecordBatch(const rj::Value& json_obj, const std::shared_ptr<Schema>& RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i])); } - *batch = std::make_shared<RecordBatch>(schema, num_rows, columns); + *batch = RecordBatch::Make(schema, num_rows, columns); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 30a1bb8..ea2947d 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -24,8 +24,8 @@ #include "arrow/buffer.h" #include "arrow/ipc/json-internal.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8e10d7d..5960e81 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -37,8 +37,8 @@ #include "arrow/ipc/message.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/tensor.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -307,7 +307,7 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema, arrays[i] = std::move(arr); } - *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays)); + *out = RecordBatch::Make(schema, num_rows, std::move(arrays)); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 7581fbd..627f67e 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -24,13 +24,12 @@ #include <memory> #include "arrow/ipc/message.h" -#include "arrow/table.h" +#include "arrow/record_batch.h" #include "arrow/util/visibility.h" namespace arrow { class Buffer; -class RecordBatch; class Schema; class Status; class Tensor; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 7fc1393..6f8a0dc 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -30,8 +30,8 @@ #include "arrow/builder.h" #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" @@ -184,7 +184,7 @@ Status MakeBooleanBatchSized(const int length, std::shared_ptr<RecordBatch>* out std::shared_ptr<Array> a0, a1; RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0)); RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -203,7 +203,7 @@ Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) { MemoryPool* pool = default_memory_pool(); RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -252,7 +252,7 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) { auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, true, pool, &a1); RETURN_NOT_OK(s); } - out->reset(new RecordBatch(schema, length, {a0, a1})); + *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } @@ -261,7 +261,7 @@ Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out) { auto f0 = field("f0", null()); auto schema = ::arrow::schema({f0}); std::shared_ptr<Array> a0 = std::make_shared<NullArray>(length); - out->reset(new RecordBatch(schema, length, {a0})); + *out = RecordBatch::Make(schema, length, {a0}); return Status::OK(); } @@ -284,7 +284,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -304,7 +304,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, 0, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -327,7 +327,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + *out = RecordBatch::Make(schema, length, {list_array, list_list_array, flat_array}); return Status::OK(); } @@ -347,7 +347,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) { auto f0 = field("f0", type); auto schema = ::arrow::schema({f0}); std::vector<std::shared_ptr<Array>> arrays = {array}; - out->reset(new RecordBatch(schema, batch_length, arrays)); + *out = RecordBatch::Make(schema, batch_length, arrays); return Status::OK(); } @@ -377,7 +377,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) { // construct batch std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls}; - out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays)); + *out = RecordBatch::Make(schema, list_batch->num_rows(), arrays); return Status::OK(); } @@ -445,7 +445,7 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { // construct batch std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -526,7 +526,7 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -564,7 +564,7 @@ Status MakeDictionaryFlat(std::shared_ptr<RecordBatch>* out) { {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)}); std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2}; - out->reset(new RecordBatch(schema, length, arrays)); + *out = RecordBatch::Make(schema, length, arrays); return Status::OK(); } @@ -584,8 +584,7 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Array> date64_array; ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array); - std::vector<std::shared_ptr<Array>> arrays = {date32_array, date64_array}; - *out = std::make_shared<RecordBatch>(schema, date32_array->length(), arrays); + *out = RecordBatch::Make(schema, date32_array->length(), {date32_array, date64_array}); return Status::OK(); } @@ -604,8 +603,7 @@ Status MakeTimestamps(std::shared_ptr<RecordBatch>* out) { ArrayFromVector<TimestampType, int64_t>(f1->type(), is_valid, ts_values, &a1); ArrayFromVector<TimestampType, int64_t>(f2->type(), is_valid, ts_values, &a2); - ArrayVector arrays = {a0, a1, a2}; - *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2}); return Status::OK(); } @@ -628,8 +626,7 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) { ArrayFromVector<Time32Type, int32_t>(f2->type(), is_valid, t32_values, &a2); ArrayFromVector<Time64Type, int64_t>(f3->type(), is_valid, t64_values, &a3); - ArrayVector arrays = {a0, a1, a2, a3}; - *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays); + *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2, a3}); return Status::OK(); } @@ -665,8 +662,7 @@ Status MakeFWBinary(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK(b1.Finish(&a1)); RETURN_NOT_OK(b2.Finish(&a2)); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } @@ -695,8 +691,7 @@ Status MakeDecimal(std::shared_ptr<RecordBatch>* out) { auto a2 = std::make_shared<Decimal128Array>(f1->type(), length, data); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared<RecordBatch>(schema, length, arrays); + *out = RecordBatch::Make(schema, length, {a1, a2}); return Status::OK(); } @@ -716,8 +711,7 @@ Status MakeNull(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Array> a2; ArrayFromVector<Int64Type, int64_t>(f1->type(), is_valid, int_values, &a2); - ArrayVector arrays = {a1, a2}; - *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays); + *out = RecordBatch::Make(schema, a1->length(), {a1, a2}); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 323116f..3c1db06 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/tensor.h" @@ -508,12 +509,9 @@ class DictionaryWriter : public RecordBatchSerializer { dictionary_id_ = dictionary_id; // Make a dummy record batch. A bit tedious as we have to make a schema - std::vector<std::shared_ptr<Field>> fields = { - arrow::field("dictionary", dictionary->type())}; - auto schema = std::make_shared<Schema>(fields); - RecordBatch batch(schema, dictionary->length(), {dictionary}); - - return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length); + auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())}); + auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); + return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length); } private: diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index cfbc303..bd5f8ce 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -22,8 +22,8 @@ #include "arrow/array.h" #include "arrow/pretty_print.h" +#include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc index 86391a1..3b7d7d8 100644 --- a/cpp/src/arrow/python/python-test.cc +++ b/cpp/src/arrow/python/python-test.cc @@ -23,6 +23,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/python/arrow_to_pandas.h" @@ -81,8 +82,8 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) { std::vector<std::shared_ptr<Field>> fields = {f1, f2, f3}; std::vector<std::shared_ptr<Array>> cols = {arr, arr, arr}; - auto schema = std::make_shared<Schema>(fields); - auto table = std::make_shared<Table>(schema, cols); + auto schema = ::arrow::schema(fields); + auto table = Table::Make(schema, cols); PyObject* out; Py_BEGIN_ALLOW_THREADS; diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b0c6287..72cc5b6 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -32,13 +32,15 @@ #include "arrow/builder.h" #include "arrow/io/interfaces.h" #include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/tensor.h" +#include "arrow/util/logging.h" + #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" #include "arrow/python/util/datetime.h" -#include "arrow/tensor.h" -#include "arrow/util/logging.h" constexpr int32_t kMaxRecursionDepth = 100; @@ -694,7 +696,7 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts, std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) { auto field = std::make_shared<Field>("list", data->type()); auto schema = ::arrow::schema({field}); - return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data})); + return RecordBatch::Make(schema, data->length(), {data}); } Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) { diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc new file mode 100644 index 0000000..60932bd --- /dev/null +++ b/cpp/src/arrow/record_batch.cc @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/record_batch.h" + +#include <algorithm> +#include <cstdlib> +#include <memory> +#include <sstream> + +#include "arrow/array.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow { + +/// \class SimpleRecordBatch +/// \brief A basic, non-lazy in-memory record batch +class SimpleRecordBatch : public RecordBatch { + public: + SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<Array>>& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<Array>>&& columns) + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); + boxed_columns_.resize(schema->num_fields()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = columns[i]->data(); + } + } + + SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<ArrayData>>&& columns) + : RecordBatch(schema, num_rows) { + columns_ = std::move(columns); + boxed_columns_.resize(schema->num_fields()); + } + + SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<ArrayData>>& columns) + : RecordBatch(schema, num_rows) { + columns_ = columns; + boxed_columns_.resize(schema->num_fields()); + } + + std::shared_ptr<Array> column(int i) const override { + if (!boxed_columns_[i]) { + boxed_columns_[i] = MakeArray(columns_[i]); + } + DCHECK(boxed_columns_[i]); + return boxed_columns_[i]; + } + + std::shared_ptr<ArrayData> column_data(int i) const override { return columns_[i]; } + + std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return RecordBatch::Make(new_schema, num_rows_, columns_); + } + + std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const override { + std::vector<std::shared_ptr<ArrayData>> arrays; + arrays.reserve(num_columns()); + for (const auto& field : columns_) { + int64_t col_length = std::min(field->length - offset, length); + int64_t col_offset = field->offset + offset; + + auto new_data = std::make_shared<ArrayData>(*field); + new_data->length = col_length; + new_data->offset = col_offset; + new_data->null_count = kUnknownNullCount; + arrays.emplace_back(new_data); + } + int64_t num_rows = std::min(num_rows_ - offset, length); + return std::make_shared<SimpleRecordBatch>(schema_, num_rows, std::move(arrays)); + } + + Status Validate() const override { + if (static_cast<int>(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); + } + return RecordBatch::Validate(); + } + + private: + std::vector<std::shared_ptr<ArrayData>> columns_; + + // Caching boxed array data + mutable std::vector<std::shared_ptr<Array>> boxed_columns_; +}; + +RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows) + : schema_(schema), num_rows_(num_rows) {} + +std::shared_ptr<RecordBatch> RecordBatch::Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<Array>>& columns) { + return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns); +} + +std::shared_ptr<RecordBatch> RecordBatch::Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<Array>>&& columns) { + return std::make_shared<SimpleRecordBatch>(schema, num_rows, std::move(columns)); +} + +std::shared_ptr<RecordBatch> RecordBatch::Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<ArrayData>>&& columns) { + return std::make_shared<SimpleRecordBatch>(schema, num_rows, std::move(columns)); +} + +std::shared_ptr<RecordBatch> RecordBatch::Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<ArrayData>>& columns) { + return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns); +} + +const std::string& RecordBatch::column_name(int i) const { + return schema_->field(i)->name(); +} + +bool RecordBatch::Equals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->Equals(other.column(i))) { + return false; + } + } + + return true; +} + +bool RecordBatch::ApproxEquals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->ApproxEquals(other.column(i))) { + return false; + } + } + + return true; +} + +std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const { + return Slice(offset, this->num_rows() - offset); +} + +Status RecordBatch::Validate() const { + for (int i = 0; i < num_columns(); ++i) { + auto arr_shared = this->column_data(i); + const ArrayData& arr = *arr_shared; + if (arr.length != num_rows_) { + std::stringstream ss; + ss << "Number of rows in column " << i << " did not match batch: " << arr.length + << " vs " << num_rows_; + return Status::Invalid(ss.str()); + } + const auto& schema_type = *schema_->field(i)->type(); + if (!arr.type->Equals(schema_type)) { + std::stringstream ss; + ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " + << schema_type.ToString(); + return Status::Invalid(ss.str()); + } + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Base record batch reader + +RecordBatchReader::~RecordBatchReader() {} + +} // namespace arrow diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h new file mode 100644 index 0000000..b2c4c76 --- /dev/null +++ b/cpp/src/arrow/record_batch.h @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_RECORD_BATCH_H +#define ARROW_RECORD_BATCH_H + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/array.h" +#include "arrow/type.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class KeyValueMetadata; +class Status; + +/// \class RecordBatch +/// \brief Collection of equal-length arrays matching a particular Schema +/// +/// A record batch is table-like data structure that is semantically a sequence +/// of fields, each a contiguous Arrow array +class ARROW_EXPORT RecordBatch { + public: + virtual ~RecordBatch() = default; + + /// \param[in] schema The record batch schema + /// \param[in] num_rows length of fields in the record batch. Each array + /// should have the same length as num_rows + /// \param[in] columns the record batch fields as vector of arrays + static std::shared_ptr<RecordBatch> Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<Array>>& columns); + + /// \brief Move-based constructor for a vector of Array instances + static std::shared_ptr<RecordBatch> Make(const std::shared_ptr<Schema>& schema, + int64_t num_rows, + std::vector<std::shared_ptr<Array>>&& columns); + + /// \brief Construct record batch from vector of internal data structures + /// \since 0.5.0 + /// + /// This class is only provided with an rvalue-reference for the input data, + /// and is intended for internal use, or advanced users. + /// + /// \param schema the record batch schema + /// \param num_rows the number of semantic rows in the record batch. This + /// should be equal to the length of each field + /// \param columns the data for the batch's columns + static std::shared_ptr<RecordBatch> Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<ArrayData>>&& columns); + + /// \brief Construct record batch by copying vector of array data + /// \since 0.5.0 + static std::shared_ptr<RecordBatch> Make( + const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<ArrayData>>& columns); + + /// \brief Determine if two record batches are exactly equal + /// \return true if batches are equal + bool Equals(const RecordBatch& other) const; + + /// \brief Determine if two record batches are approximately equal + bool ApproxEquals(const RecordBatch& other) const; + + // \return the table's schema + /// \return true if batches are equal + std::shared_ptr<Schema> schema() const { return schema_; } + + /// \brief Retrieve an array from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an Array object + virtual std::shared_ptr<Array> column(int i) const = 0; + + /// \brief Retrieve an array's internaldata from the record batch + /// \param[in] i field index, does not boundscheck + /// \return an internal ArrayData object + virtual std::shared_ptr<ArrayData> column_data(int i) const = 0; + + virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0; + + /// \brief Name in i-th column + const std::string& column_name(int i) const; + + /// \return the number of columns in the table + int num_columns() const { return schema_->num_fields(); } + + /// \return the number of rows (the corresponding length of each column) + int64_t num_rows() const { return num_rows_; } + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice, through end of batch + /// \return new record batch + virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const; + + /// \brief Slice each of the arrays in the record batch + /// \param[in] offset the starting offset to slice + /// \param[in] length the number of elements to slice from offset + /// \return new record batch + virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const = 0; + + /// \brief Check for schema or length inconsistencies + /// \return Status + virtual Status Validate() const; + + protected: + RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows); + + std::shared_ptr<Schema> schema_; + int64_t num_rows_; + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); +}; + +/// \brief Abstract interface for reading stream of record batches +class ARROW_EXPORT RecordBatchReader { + public: + virtual ~RecordBatchReader(); + + /// \return the shared schema of the record batches in the stream + virtual std::shared_ptr<Schema> schema() const = 0; + + /// Read the next record batch in the stream. Return null for batch when + /// reaching end of stream + /// + /// \param[out] batch the next loaded batch, null at end of stream + /// \return Status + virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0; +}; + +} // namespace arrow + +#endif // ARROW_RECORD_BATCH_H diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index b490310..e77d3aa 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-common.h" @@ -216,8 +217,8 @@ class TestTable : public TestBase { TEST_F(TestTable, EmptySchema) { auto empty_schema = ::arrow::schema({}); - table_.reset(new Table(empty_schema, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(empty_schema, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(0, table_->num_rows()); ASSERT_EQ(0, table_->num_columns()); } @@ -226,20 +227,20 @@ TEST_F(TestTable, Ctors) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); - auto array_ctor = std::make_shared<Table>(schema_, arrays_); + auto array_ctor = Table::Make(schema_, arrays_); ASSERT_TRUE(table_->Equals(*array_ctor)); - table_.reset(new Table(schema_, columns_, length)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); - ASSERT_OK(MakeTable(schema_, arrays_, &table_)); - ASSERT_OK(table_->ValidateColumns()); + table_ = Table::Make(schema_, arrays_); + ASSERT_OK(table_->Validate()); ASSERT_EQ(length, table_->num_rows()); ASSERT_EQ(3, table_->num_columns()); } @@ -248,7 +249,7 @@ TEST_F(TestTable, Metadata) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->schema()->Equals(*schema_)); @@ -262,14 +263,14 @@ TEST_F(TestTable, InvalidColumns) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_, length - 1)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length - 1); + ASSERT_RAISES(Invalid, table_->Validate()); columns_.clear(); // Wrong number of columns - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); columns_ = { std::make_shared<Column>(schema_->field(0), MakeRandomArray<Int32Array>(length)), @@ -277,15 +278,15 @@ TEST_F(TestTable, InvalidColumns) { std::make_shared<Column>(schema_->field(2), MakeRandomArray<Int16Array>(length - 1))}; - table_.reset(new Table(schema_, columns_, length)); - ASSERT_RAISES(Invalid, table_->ValidateColumns()); + table_ = Table::Make(schema_, columns_, length); + ASSERT_RAISES(Invalid, table_->Validate()); } TEST_F(TestTable, Equals) { const int length = 100; MakeExample1(length); - table_.reset(new Table(schema_, columns_)); + table_ = Table::Make(schema_, columns_); ASSERT_TRUE(table_->Equals(*table_)); // Differing schema @@ -294,7 +295,8 @@ TEST_F(TestTable, Equals) { auto f2 = field("f5", int16()); vector<shared_ptr<Field>> fields = {f0, f1, f2}; auto other_schema = std::make_shared<Schema>(fields); - ASSERT_FALSE(table_->Equals(Table(other_schema, columns_))); + auto other = Table::Make(other_schema, columns_); + ASSERT_FALSE(table_->Equals(*other)); // Differing columns std::vector<std::shared_ptr<Column>> other_columns = { std::make_shared<Column>(schema_->field(0), @@ -303,19 +305,21 @@ TEST_F(TestTable, Equals) { MakeRandomArray<UInt8Array>(length, 10)), std::make_shared<Column>(schema_->field(2), MakeRandomArray<Int16Array>(length, 10))}; - ASSERT_FALSE(table_->Equals(Table(schema_, other_columns))); + + other = Table::Make(schema_, other_columns); + ASSERT_FALSE(table_->Equals(*other)); } TEST_F(TestTable, FromRecordBatches) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr<Table> result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &result)); - expected = std::make_shared<Table>(schema_, columns_); + expected = Table::Make(schema_, columns_); ASSERT_TRUE(result->Equals(*expected)); std::vector<std::shared_ptr<Column>> other_columns; @@ -325,18 +329,17 @@ TEST_F(TestTable, FromRecordBatches) { } ASSERT_OK(Table::FromRecordBatches({batch1, batch1}, &result)); - expected = std::make_shared<Table>(schema_, other_columns); + expected = Table::Make(schema_, other_columns); ASSERT_TRUE(result->Equals(*expected)); // Error states std::vector<std::shared_ptr<RecordBatch>> empty_batches; ASSERT_RAISES(Invalid, Table::FromRecordBatches(empty_batches, &result)); - std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared<Schema>(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]}; - auto batch2 = std::make_shared<RecordBatch>(other_schema, length, other_arrays); + auto batch2 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result)); } @@ -344,11 +347,11 @@ TEST_F(TestTable, ConcatenateTables) { const int64_t length = 10; MakeExample1(length); - auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_); + auto batch1 = RecordBatch::Make(schema_, length, arrays_); // generate different data MakeExample1(length); - auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_); + auto batch2 = RecordBatch::Make(schema_, length, arrays_); std::shared_ptr<Table> t1, t2, t3, result, expected; ASSERT_OK(Table::FromRecordBatches({batch1}, &t1)); @@ -362,11 +365,10 @@ TEST_F(TestTable, ConcatenateTables) { std::vector<std::shared_ptr<Table>> empty_tables; ASSERT_RAISES(Invalid, ConcatenateTables(empty_tables, &result)); - std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)}; - auto other_schema = std::make_shared<Schema>(fields); + auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]}; - auto batch3 = std::make_shared<RecordBatch>(other_schema, length, other_arrays); + auto batch3 = RecordBatch::Make(other_schema, length, other_arrays); ASSERT_OK(Table::FromRecordBatches({batch3}, &t3)); ASSERT_RAISES(Invalid, ConcatenateTables({t1, t3}, &result)); @@ -376,31 +378,38 @@ TEST_F(TestTable, RemoveColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr<Table> result; ASSERT_OK(table.RemoveColumn(0, &result)); auto ex_schema = ::arrow::schema({schema_->field(1), schema_->field(2)}); std::vector<std::shared_ptr<Column>> ex_columns = {table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(1, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(2)}); ex_columns = {table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.RemoveColumn(2, &result)); ex_schema = ::arrow::schema({schema_->field(0), schema_->field(1)}); ex_columns = {table.column(0), table.column(1)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make(ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(*expected)); } TEST_F(TestTable, AddColumn) { const int64_t length = 10; MakeExample1(length); - Table table(schema_, columns_); + auto table_sp = Table::Make(schema_, columns_); + const Table& table = *table_sp; std::shared_ptr<Table> result; // Some negative tests with invalid index @@ -419,50 +428,32 @@ TEST_F(TestTable, AddColumn) { ASSERT_OK(table.AddColumn(0, columns_[0], &result)); auto ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - std::vector<std::shared_ptr<Column>> ex_columns = {table.column(0), table.column(0), - table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + auto expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(1, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(0), schema_->field(1), schema_->field(2)}); - ex_columns = {table.column(0), table.column(0), table.column(1), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + + expected = Table::Make( + ex_schema, {table.column(0), table.column(0), table.column(1), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(2, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(0), schema_->field(2)}); - ex_columns = {table.column(0), table.column(1), table.column(0), table.column(2)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(0), table.column(2)}); + ASSERT_TRUE(result->Equals(*expected)); ASSERT_OK(table.AddColumn(3, columns_[0], &result)); ex_schema = ::arrow::schema( {schema_->field(0), schema_->field(1), schema_->field(2), schema_->field(0)}); - ex_columns = {table.column(0), table.column(1), table.column(2), table.column(0)}; - ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns))); -} - -TEST_F(TestTable, IsChunked) { - ArrayVector c1, c2; - - auto a1 = MakeRandomArray<Int32Array>(10); - auto a2 = MakeRandomArray<Int32Array>(20); - - auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())}); - - std::vector<std::shared_ptr<Column>> columns; - - std::shared_ptr<RecordBatch> batch; - - columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})}; - auto t1 = std::make_shared<Table>(sch1, columns); - - ASSERT_FALSE(t1->IsChunked()); - - columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})}; - auto t2 = std::make_shared<Table>(sch1, columns); - - ASSERT_TRUE(t2->IsChunked()); + expected = Table::Make( + ex_schema, {table.column(0), table.column(1), table.column(2), table.column(0)}); + ASSERT_TRUE(result->Equals(*expected)); } class TestRecordBatch : public TestBase {}; @@ -475,24 +466,22 @@ TEST_F(TestRecordBatch, Equals) { auto f2 = field("f2", int16()); vector<shared_ptr<Field>> fields = {f0, f1, f2}; - auto schema = std::make_shared<Schema>(fields); + auto schema = ::arrow::schema({f0, f1, f2}); + auto schema2 = ::arrow::schema({f0, f1}); auto a0 = MakeRandomArray<Int32Array>(length); auto a1 = MakeRandomArray<UInt8Array>(length); auto a2 = MakeRandomArray<Int16Array>(length); - RecordBatch b1(schema, length, {a0, a1, a2}); - RecordBatch b3(schema, length, {a0, a1}); - RecordBatch b4(schema, length, {a0, a1, a1}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); + auto b3 = RecordBatch::Make(schema2, length, {a0, a1}); + auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1}); - ASSERT_TRUE(b1.Equals(b1)); - ASSERT_FALSE(b1.Equals(b3)); - ASSERT_FALSE(b1.Equals(b4)); + ASSERT_TRUE(b1->Equals(*b1)); + ASSERT_FALSE(b1->Equals(*b3)); + ASSERT_FALSE(b1->Equals(*b4)); } -#ifdef NDEBUG -// In debug builds, RecordBatch ctor aborts if you construct an invalid one - TEST_F(TestRecordBatch, Validate) { const int length = 10; @@ -507,21 +496,19 @@ TEST_F(TestRecordBatch, Validate) { auto a2 = MakeRandomArray<Int16Array>(length); auto a3 = MakeRandomArray<Int16Array>(5); - RecordBatch b1(schema, length, {a0, a1, a2}); + auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2}); - ASSERT_OK(b1.Validate()); + ASSERT_OK(b1->Validate()); // Length mismatch - RecordBatch b2(schema, length, {a0, a1, a3}); - ASSERT_RAISES(Invalid, b2.Validate()); + auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3}); + ASSERT_RAISES(Invalid, b2->Validate()); // Type mismatch - RecordBatch b3(schema, length, {a0, a1, a0}); - ASSERT_RAISES(Invalid, b3.Validate()); + auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0}); + ASSERT_RAISES(Invalid, b3->Validate()); } -#endif - TEST_F(TestRecordBatch, Slice) { const int length = 10; @@ -529,19 +516,19 @@ TEST_F(TestRecordBatch, Slice) { auto f1 = field("f1", uint8()); vector<shared_ptr<Field>> fields = {f0, f1}; - auto schema = std::make_shared<Schema>(fields); + auto schema = ::arrow::schema(fields); auto a0 = MakeRandomArray<Int32Array>(length); auto a1 = MakeRandomArray<UInt8Array>(length); - RecordBatch batch(schema, length, {a0, a1}); + auto batch = RecordBatch::Make(schema, length, {a0, a1}); - auto batch_slice = batch.Slice(2); - auto batch_slice2 = batch.Slice(1, 5); + auto batch_slice = batch->Slice(2); + auto batch_slice2 = batch->Slice(1, 5); - ASSERT_EQ(batch_slice->num_rows(), batch.num_rows() - 2); + ASSERT_EQ(batch_slice->num_rows(), batch->num_rows() - 2); - for (int i = 0; i < batch.num_columns(); ++i) { + for (int i = 0; i < batch->num_columns(); ++i) { ASSERT_EQ(2, batch_slice->column(i)->offset()); ASSERT_EQ(length - 2, batch_slice->column(i)->length()); @@ -567,9 +554,9 @@ TEST_F(TestTableBatchReader, ReadNext) { std::shared_ptr<RecordBatch> batch; columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2, a2})}; - Table t1(sch1, columns); + auto t1 = Table::Make(sch1, columns); - TableBatchReader i1(t1); + TableBatchReader i1(*t1); ASSERT_OK(i1.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); @@ -584,9 +571,9 @@ TEST_F(TestTableBatchReader, ReadNext) { ASSERT_EQ(nullptr, batch); columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})}; - Table t2(sch1, columns); + auto t2 = Table::Make(sch1, columns); - TableBatchReader i2(t2); + TableBatchReader i2(*t2); ASSERT_OK(i2.ReadNext(&batch)); ASSERT_EQ(10, batch->num_rows()); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index fe19bf4..8f3f195 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -23,6 +23,7 @@ #include <sstream> #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/util/logging.h" @@ -153,171 +154,126 @@ Status Column::ValidateData() { } // ---------------------------------------------------------------------- -// RecordBatch methods - -RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows) - : schema_(schema), num_rows_(num_rows) { - boxed_columns_.resize(schema->num_fields()); -} - -RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - const std::vector<std::shared_ptr<Array>>& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - std::vector<std::shared_ptr<Array>>&& columns) - : RecordBatch(schema, num_rows) { - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = columns[i]->data(); - } -} - -RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - std::vector<std::shared_ptr<ArrayData>>&& columns) - : RecordBatch(schema, num_rows) { - columns_ = std::move(columns); -} - -RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - const std::vector<std::shared_ptr<ArrayData>>& columns) - : RecordBatch(schema, num_rows) { - columns_ = columns; -} - -std::shared_ptr<Array> RecordBatch::column(int i) const { - if (!boxed_columns_[i]) { - boxed_columns_[i] = MakeArray(columns_[i]); - } - DCHECK(boxed_columns_[i]); - return boxed_columns_[i]; -} - -const std::string& RecordBatch::column_name(int i) const { - return schema_->field(i)->name(); -} - -bool RecordBatch::Equals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } +// Table methods - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->Equals(other.column(i))) { - return false; +/// \class SimpleTable +/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch +class SimpleTable : public Table { + public: + SimpleTable(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows = -1) + : columns_(columns) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; } } - return true; -} - -bool RecordBatch::ApproxEquals(const RecordBatch& other) const { - if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { - return false; - } + SimpleTable(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) { + schema_ = schema; + if (num_rows < 0) { + if (columns.size() == 0) { + num_rows_ = 0; + } else { + num_rows_ = columns[0]->length(); + } + } else { + num_rows_ = num_rows; + } - for (int i = 0; i < num_columns(); ++i) { - if (!column(i)->ApproxEquals(other.column(i))) { - return false; + columns_.resize(columns.size()); + for (size_t i = 0; i < columns.size(); ++i) { + columns_[i] = + std::make_shared<Column>(schema->field(static_cast<int>(i)), columns[i]); } } - return true; -} - -std::shared_ptr<RecordBatch> RecordBatch::ReplaceSchemaMetadata( - const std::shared_ptr<const KeyValueMetadata>& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared<RecordBatch>(new_schema, num_rows_, columns_); -} + std::shared_ptr<Column> column(int i) const override { return columns_[i]; } -std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const { - return Slice(offset, this->num_rows() - offset); -} + Status RemoveColumn(int i, std::shared_ptr<Table>* out) const override { + std::shared_ptr<Schema> new_schema; + RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); -std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t length) const { - std::vector<std::shared_ptr<ArrayData>> arrays; - arrays.reserve(num_columns()); - for (const auto& field : columns_) { - int64_t col_length = std::min(field->length - offset, length); - int64_t col_offset = field->offset + offset; - - auto new_data = std::make_shared<ArrayData>(*field); - new_data->length = col_length; - new_data->offset = col_offset; - new_data->null_count = kUnknownNullCount; - arrays.emplace_back(new_data); + *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i)); + return Status::OK(); } - int64_t num_rows = std::min(num_rows_ - offset, length); - return std::make_shared<RecordBatch>(schema_, num_rows, std::move(arrays)); -} -Status RecordBatch::Validate() const { - for (int i = 0; i < num_columns(); ++i) { - const ArrayData& arr = *columns_[i]; - if (arr.length != num_rows_) { + Status AddColumn(int i, const std::shared_ptr<Column>& col, + std::shared_ptr<Table>* out) const override { + if (i < 0 || i > num_columns() + 1) { + return Status::Invalid("Invalid column index."); + } + if (col == nullptr) { std::stringstream ss; - ss << "Number of rows in column " << i << " did not match batch: " << arr.length - << " vs " << num_rows_; + ss << "Column " << i << " was null"; return Status::Invalid(ss.str()); } - const auto& schema_type = *schema_->field(i)->type(); - if (!arr.type->Equals(schema_type)) { + if (col->length() != num_rows_) { std::stringstream ss; - ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs " - << schema_type.ToString(); + ss << "Added column's length must match table's length. Expected length " + << num_rows_ << " but got length " << col->length(); return Status::Invalid(ss.str()); } + + std::shared_ptr<Schema> new_schema; + RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); + + *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); + return Status::OK(); } - return Status::OK(); -} -// ---------------------------------------------------------------------- -// Table methods + std::shared_ptr<Table> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const override { + auto new_schema = schema_->AddMetadata(metadata); + return Table::Make(new_schema, columns_); + } -Table::Table(const std::shared_ptr<Schema>& schema, - const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows) - : schema_(schema), columns_(columns) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); + Status Validate() const override { + if (static_cast<int>(columns_.size()) != schema_->num_fields()) { + return Status::Invalid("Number of columns did not match schema"); } - } else { - num_rows_ = num_rows; - } -} -Table::Table(const std::shared_ptr<Schema>& schema, - const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows) - : schema_(schema) { - if (num_rows < 0) { - if (columns.size() == 0) { - num_rows_ = 0; - } else { - num_rows_ = columns[0]->length(); + // Make sure columns are all the same length + for (int i = 0; i < num_columns(); ++i) { + const Column* col = columns_[i].get(); + if (col == nullptr) { + std::stringstream ss; + ss << "Column " << i << " was null"; + return Status::Invalid(ss.str()); + } + if (col->length() != num_rows_) { + std::stringstream ss; + ss << "Column " << i << " named " << col->name() << " expected length " + << num_rows_ << " but got length " << col->length(); + return Status::Invalid(ss.str()); + } } - } else { - num_rows_ = num_rows; + return Status::OK(); } - columns_.resize(columns.size()); - for (size_t i = 0; i < columns.size(); ++i) { - columns_[i] = - std::make_shared<Column>(schema->field(static_cast<int>(i)), columns[i]); - } + private: + std::vector<std::shared_ptr<Column>> columns_; +}; + +Table::Table() {} + +std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Column>>& columns, + int64_t num_rows) { + return std::make_shared<SimpleTable>(schema, columns, num_rows); } -std::shared_ptr<Table> Table::ReplaceSchemaMetadata( - const std::shared_ptr<const KeyValueMetadata>& metadata) const { - auto new_schema = schema_->AddMetadata(metadata); - return std::make_shared<Table>(new_schema, columns_); +std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Array>>& arrays, + int64_t num_rows) { + return std::make_shared<SimpleTable>(schema, arrays, num_rows); } Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches, @@ -351,7 +307,7 @@ Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); } - *table = std::make_shared<Table>(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -388,7 +344,7 @@ Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables, } columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); } - *table = std::make_shared<Table>(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } @@ -399,82 +355,19 @@ bool Table::Equals(const Table& other) const { if (!schema_->Equals(*other.schema())) { return false; } - if (static_cast<int64_t>(columns_.size()) != other.num_columns()) { + if (this->num_columns() != other.num_columns()) { return false; } - for (int i = 0; i < static_cast<int>(columns_.size()); i++) { - if (!columns_[i]->Equals(other.column(i))) { + for (int i = 0; i < this->num_columns(); i++) { + if (!this->column(i)->Equals(other.column(i))) { return false; } } return true; } -Status Table::RemoveColumn(int i, std::shared_ptr<Table>* out) const { - std::shared_ptr<Schema> new_schema; - RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); - - *out = std::make_shared<Table>(new_schema, internal::DeleteVectorElement(columns_, i)); - return Status::OK(); -} - -Status Table::AddColumn(int i, const std::shared_ptr<Column>& col, - std::shared_ptr<Table>* out) const { - if (i < 0 || i > num_columns() + 1) { - return Status::Invalid("Invalid column index."); - } - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Added column's length must match table's length. Expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - - std::shared_ptr<Schema> new_schema; - RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); - - *out = - std::make_shared<Table>(new_schema, internal::AddVectorElement(columns_, i, col)); - return Status::OK(); -} - -Status Table::ValidateColumns() const { - if (num_columns() != schema_->num_fields()) { - return Status::Invalid("Number of columns did not match schema"); - } - - // Make sure columns are all the same length - for (size_t i = 0; i < columns_.size(); ++i) { - const Column* col = columns_[i].get(); - if (col == nullptr) { - std::stringstream ss; - ss << "Column " << i << " was null"; - return Status::Invalid(ss.str()); - } - if (col->length() != num_rows_) { - std::stringstream ss; - ss << "Column " << i << " named " << col->name() << " expected length " << num_rows_ - << " but got length " << col->length(); - return Status::Invalid(ss.str()); - } - } - return Status::OK(); -} - -bool Table::IsChunked() const { - for (size_t i = 0; i < columns_.size(); ++i) { - if (columns_[i]->data()->num_chunks() > 1) { - return true; - } - } - return false; -} +#ifndef ARROW_NO_DEPRECATED_API Status MakeTable(const std::shared_ptr<Schema>& schema, const std::vector<std::shared_ptr<Array>>& arrays, @@ -493,15 +386,12 @@ Status MakeTable(const std::shared_ptr<Schema>& schema, columns.emplace_back(std::make_shared<Column>(schema->field(i), arrays[i])); } - *table = std::make_shared<Table>(schema, columns); + *table = Table::Make(schema, columns); return Status::OK(); } -// ---------------------------------------------------------------------- -// Base record batch reader - -RecordBatchReader::~RecordBatchReader() {} +#endif // ARROW_NO_DEPRECATED_API // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches @@ -565,8 +455,7 @@ class TableBatchReader::TableBatchReaderImpl { } absolute_row_position_ += chunksize; - *out = - std::make_shared<RecordBatch>(table_.schema(), chunksize, std::move(batch_data)); + *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); return Status::OK(); } diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 2cff32f..d0312d9 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -24,6 +24,7 @@ #include <vector> #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/type.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -33,8 +34,6 @@ namespace arrow { class KeyValueMetadata; class Status; -using ArrayVector = std::vector<std::shared_ptr<Array>>; - /// \class ChunkedArray /// \brief A data structure managing a list of primitive Arrow arrays logically /// as one large array @@ -113,123 +112,28 @@ class ARROW_EXPORT Column { ARROW_DISALLOW_COPY_AND_ASSIGN(Column); }; -/// \class RecordBatch -/// \brief Collection of equal-length arrays matching a particular Schema -/// -/// A record batch is table-like data structure consisting of an internal -/// sequence of fields, each a contiguous Arrow array -class ARROW_EXPORT RecordBatch { - public: - /// \param[in] schema The record batch schema - /// \param[in] num_rows length of fields in the record batch. Each array - /// should have the same length as num_rows - /// \param[in] columns the record batch fields as vector of arrays - RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - const std::vector<std::shared_ptr<Array>>& columns); - - /// \brief Move-based constructor for a vector of Array instances - RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - std::vector<std::shared_ptr<Array>>&& columns); - - /// \brief Construct record batch from vector of internal data structures - /// \since 0.5.0 - /// - /// This class is only provided with an rvalue-reference for the input data, - /// and is intended for internal use, or advanced users. - /// - /// \param schema the record batch schema - /// \param num_rows the number of semantic rows in the record batch. This - /// should be equal to the length of each field - /// \param columns the data for the batch's columns - RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - std::vector<std::shared_ptr<ArrayData>>&& columns); - - /// \brief Construct record batch by copying vector of array data - /// \since 0.5.0 - RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, - const std::vector<std::shared_ptr<ArrayData>>& columns); - - /// \brief Determine if two record batches are exactly equal - /// \return true if batches are equal - bool Equals(const RecordBatch& other) const; - - /// \brief Determine if two record batches are approximately equal - bool ApproxEquals(const RecordBatch& other) const; - - // \return the table's schema - /// \return true if batches are equal - std::shared_ptr<Schema> schema() const { return schema_; } - - /// \brief Retrieve an array from the record batch - /// \param[in] i field index, does not boundscheck - /// \return an Array object - std::shared_ptr<Array> column(int i) const; - - std::shared_ptr<ArrayData> column_data(int i) const { return columns_[i]; } - - /// \brief Name in i-th column - const std::string& column_name(int i) const; - - /// \return the number of columns in the table - int num_columns() const { return static_cast<int>(columns_.size()); } - - /// \return the number of rows (the corresponding length of each column) - int64_t num_rows() const { return num_rows_; } - - /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) - /// \since 0.5.0 - /// - /// \param[in] metadata new KeyValueMetadata - /// \return new RecordBatch - std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( - const std::shared_ptr<const KeyValueMetadata>& metadata) const; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice, through end of batch - /// \return new record batch - std::shared_ptr<RecordBatch> Slice(int64_t offset) const; - - /// \brief Slice each of the arrays in the record batch - /// \param[in] offset the starting offset to slice - /// \param[in] length the number of elements to slice from offset - /// \return new record batch - std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const; - - /// \brief Check for schema or length inconsistencies - /// \return Status - Status Validate() const; - - private: - ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); - - RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows); - - std::shared_ptr<Schema> schema_; - int64_t num_rows_; - std::vector<std::shared_ptr<ArrayData>> columns_; - - // Caching boxed array data - mutable std::vector<std::shared_ptr<Array>> boxed_columns_; -}; - /// \class Table /// \brief Logical table as sequence of chunked arrays class ARROW_EXPORT Table { public: + virtual ~Table() = default; + /// \brief Construct Table from schema and columns /// If columns is zero-length, the table's number of rows is zero /// \param schema The table schema (column types) /// \param columns The table's columns /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr<Schema>& schema, - const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows = -1); + static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Column>>& columns, + int64_t num_rows = -1); /// \brief Construct Table from schema and arrays /// \param schema The table schema (column types) /// \param arrays The table's columns as arrays /// \param num_rows number of rows in table, -1 (default) to infer from columns - Table(const std::shared_ptr<Schema>& schema, - const std::vector<std::shared_ptr<Array>>& arrays, int64_t num_rows = -1); + static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema, + const std::vector<std::shared_ptr<Array>>& arrays, + int64_t num_rows = -1); // Construct table from RecordBatch, but only if all of the batch schemas are // equal. Returns Status::Invalid if there is some problem @@ -242,25 +146,28 @@ class ARROW_EXPORT Table { /// \param[in] i column index, does not boundscheck /// \return the i-th column - std::shared_ptr<Column> column(int i) const { return columns_[i]; } + virtual std::shared_ptr<Column> column(int i) const = 0; /// \brief Remove column from the table, producing a new Table - Status RemoveColumn(int i, std::shared_ptr<Table>* out) const; + virtual Status RemoveColumn(int i, std::shared_ptr<Table>* out) const = 0; /// \brief Add column to the table, producing a new Table - Status AddColumn(int i, const std::shared_ptr<Column>& column, - std::shared_ptr<Table>* out) const; + virtual Status AddColumn(int i, const std::shared_ptr<Column>& column, + std::shared_ptr<Table>* out) const = 0; /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) /// \since 0.5.0 /// /// \param[in] metadata new KeyValueMetadata /// \return new Table - std::shared_ptr<Table> ReplaceSchemaMetadata( - const std::shared_ptr<const KeyValueMetadata>& metadata) const; + virtual std::shared_ptr<Table> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0; + + /// \brief Perform any checks to validate the input arguments + virtual Status Validate() const = 0; /// \return the number of columns in the table - int num_columns() const { return static_cast<int>(columns_.size()); } + int num_columns() const { return schema_->num_fields(); } /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } @@ -268,35 +175,14 @@ class ARROW_EXPORT Table { /// \brief Determine if semantic contents of tables are exactly equal bool Equals(const Table& other) const; - /// \brief Perform any checks to validate the input arguments - Status ValidateColumns() const; - - /// \brief Return true if any column has multiple chunks - bool IsChunked() const; - - private: - ARROW_DISALLOW_COPY_AND_ASSIGN(Table); + protected: + Table(); std::shared_ptr<Schema> schema_; - std::vector<std::shared_ptr<Column>> columns_; - int64_t num_rows_; -}; - -/// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT RecordBatchReader { - public: - virtual ~RecordBatchReader(); - /// \return the shared schema of the record batches in the stream - virtual std::shared_ptr<Schema> schema() const = 0; - - /// Read the next record batch in the stream. Return null for batch when - /// reaching end of stream - /// - /// \param[out] batch the next loaded batch, null at end of stream - /// \return Status - virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0; + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(Table); }; /// \brief Compute a sequence of record batches from a (possibly chunked) Table @@ -322,13 +208,18 @@ ARROW_EXPORT Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table); +#ifndef ARROW_NO_DEPRECATED_API + /// \brief Construct table from multiple input tables. /// \return Status, fails if any schemas are different +/// \note Deprecated since 0.8.0 ARROW_EXPORT Status MakeTable(const std::shared_ptr<Schema>& schema, const std::vector<std::shared_ptr<Array>>& arrays, std::shared_ptr<Table>* table); +#endif + } // namespace arrow #endif // ARROW_TABLE_H diff --git a/cpp/src/arrow/table_builder-test.cc b/cpp/src/arrow/table_builder-test.cc index 07d9b6b..8167577 100644 --- a/cpp/src/arrow/table_builder-test.cc +++ b/cpp/src/arrow/table_builder-test.cc @@ -22,6 +22,7 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -98,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(ex_b1.Finish(&a1)); ASSERT_OK(ex_b2.Finish(&a2)); - RecordBatch expected(schema, 4, {a0, a1, a2}); + auto expected = RecordBatch::Make(schema, 4, {a0, a1, a2}); // Builder attributes ASSERT_EQ(3, builder->num_fields()); @@ -119,7 +120,7 @@ TEST_F(TestRecordBatchBuilder, Basics) { ASSERT_OK(builder->Flush(&batch)); } - ASSERT_BATCHES_EQUAL(expected, *batch); + ASSERT_BATCHES_EQUAL(*expected, *batch); } // Test setting initial capacity diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc index a1bd959..379d886 100644 --- a/cpp/src/arrow/table_builder.cc +++ b/cpp/src/arrow/table_builder.cc @@ -24,6 +24,7 @@ #include "arrow/array.h" #include "arrow/builder.h" +#include "arrow/record_batch.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" @@ -64,7 +65,7 @@ Status RecordBatchBuilder::Flush(bool reset_builders, } length = fields[i]->length(); } - *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields)); + *batch = RecordBatch::Make(schema_, length, std::move(fields)); if (reset_builders) { return InitBuilders(); } else { diff --git a/cpp/src/arrow/test-common.h b/cpp/src/arrow/test-common.h index a4c4fdd..911adf7 100644 --- a/cpp/src/arrow/test-common.h +++ b/cpp/src/arrow/test-common.h @@ -30,7 +30,6 @@ #include "arrow/buffer.h" #include "arrow/builder.h" #include "arrow/memory_pool.h" -#include "arrow/table.h" #include "arrow/test-util.h" namespace arrow { diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 77f489a..1a34808 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -35,7 +35,6 @@ #include "arrow/memory_pool.h" #include "arrow/pretty_print.h" #include "arrow/status.h" -#include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -375,7 +374,7 @@ void AssertArraysEqual(const Array& expected, const Array& actual) { #define ASSERT_BATCHES_EQUAL(LEFT, RIGHT) \ do { \ - if (!LEFT.ApproxEquals(RIGHT)) { \ + if (!(LEFT).ApproxEquals(RIGHT)) { \ std::stringstream ss; \ ss << "Left:\n"; \ ASSERT_OK(PrettyPrint(LEFT, 0, &ss)); \ diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 70f275c..8dcc159 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -498,9 +498,9 @@ class ARROW_EXPORT StructType : public NestedType { std::vector<BufferDescr> GetBufferLayout() const override; }; -class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { +class ARROW_EXPORT DecimalType : public FixedSizeBinaryType { public: - explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t scale) + explicit DecimalType(int32_t byte_width, int32_t precision, int32_t scale) : FixedSizeBinaryType(byte_width, Type::DECIMAL), precision_(precision), scale_(scale) {} @@ -513,21 +513,18 @@ class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType { int32_t scale_; }; -class ARROW_EXPORT Decimal128Type : public DecimalBaseType { +class ARROW_EXPORT Decimal128Type : public DecimalType { public: static constexpr Type::type type_id = Type::DECIMAL; explicit Decimal128Type(int32_t precision, int32_t scale) - : DecimalBaseType(16, precision, scale) {} + : DecimalType(16, precision, scale) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; std::string name() const override { return "decimal"; } }; -// TODO(wesm): Remove this -using DecimalType = Decimal128Type; - struct UnionMode { enum type { SPARSE, DENSE }; }; diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dbfd89c..73e34c7 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -403,8 +403,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CChunkedArray] data() cdef cppclass CRecordBatch" arrow::RecordBatch": - CRecordBatch(const shared_ptr[CSchema]& schema, int64_t num_rows, - const vector[shared_ptr[CArray]]& columns) + @staticmethod + shared_ptr[CRecordBatch] Make( + const shared_ptr[CSchema]& schema, int64_t num_rows, + const vector[shared_ptr[CArray]]& columns) c_bool Equals(const CRecordBatch& other) @@ -428,6 +430,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: const vector[shared_ptr[CColumn]]& columns) @staticmethod + shared_ptr[CTable] Make( + const shared_ptr[CSchema]& schema, + const vector[shared_ptr[CColumn]]& columns) + + @staticmethod CStatus FromRecordBatches( const vector[shared_ptr[CRecordBatch]]& batches, shared_ptr[CTable]* table) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 591f329..8c5b8bb 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -724,7 +724,6 @@ cdef class RecordBatch: Array arr c_string c_name shared_ptr[CSchema] schema - shared_ptr[CRecordBatch] batch vector[shared_ptr[CArray]] c_arrays int64_t num_rows int64_t i @@ -740,8 +739,8 @@ cdef class RecordBatch: for arr in arrays: c_arrays.push_back(arr.sp_array) - batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) - return pyarrow_wrap_batch(batch) + return pyarrow_wrap_batch( + CRecordBatch.Make(schema, num_rows, c_arrays)) def table_to_blocks(PandasOptions options, Table table, int nthreads, @@ -946,8 +945,7 @@ cdef class Table: else: raise ValueError(type(arrays[i])) - table.reset(new CTable(c_schema, columns)) - return pyarrow_wrap_table(table) + return pyarrow_wrap_table(CTable.Make(c_schema, columns)) @staticmethod def from_batches(batches): -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].