[
https://issues.apache.org/jira/browse/ARROW-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261710#comment-16261710
]
ASF GitHub Bot commented on ARROW-1808:
---------------------------------------
wesm closed pull request #1337: ARROW-1808: [C++] Make RecordBatch, Table
virtual interfaces for column access
URL: https://github.com/apache/arrow/pull/1337
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.travis.yml b/.travis.yml
index 9c714a689..ddadf739a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -55,6 +55,7 @@ matrix:
- export ARROW_TRAVIS_VALGRIND=1
- export ARROW_TRAVIS_PLASMA=1
- export ARROW_TRAVIS_CLANG_FORMAT=1
+ - export ARROW_BUILD_WARNING_LEVEL=CHECKIN
- export CC="clang-4.0"
- export CXX="clang++-4.0"
- $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh
@@ -74,6 +75,7 @@ matrix:
before_script:
- export ARROW_TRAVIS_USE_TOOLCHAIN=1
- export ARROW_TRAVIS_PLASMA=1
+ - export ARROW_BUILD_WARNING_LEVEL=CHECKIN
- travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
diff --git a/c_glib/arrow-glib/record-batch.cpp
b/c_glib/arrow-glib/record-batch.cpp
index f381af0a2..f23a0cf75 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -150,9 +150,8 @@ garrow_record_batch_new(GArrowSchema *schema,
}
auto arrow_record_batch =
- std::make_shared<arrow::RecordBatch>(garrow_schema_get_raw(schema),
- n_rows,
- arrow_columns);
+ arrow::RecordBatch::Make(garrow_schema_get_raw(schema),
+ n_rows, arrow_columns);
return garrow_record_batch_new_raw(&arrow_record_batch);
}
diff --git a/c_glib/arrow-glib/table.cpp b/c_glib/arrow-glib/table.cpp
index 779f2ef62..e086396f8 100644
--- a/c_glib/arrow-glib/table.cpp
+++ b/c_glib/arrow-glib/table.cpp
@@ -143,8 +143,7 @@ garrow_table_new(GArrowSchema *schema,
}
auto arrow_table =
- std::make_shared<arrow::Table>(garrow_schema_get_raw(schema),
- arrow_columns);
+ arrow::Table::Make(garrow_schema_get_raw(schema), arrow_columns);
return garrow_table_new_raw(&arrow_table);
}
diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb
index 3de8e5cf3..67aed85f7 100644
--- a/c_glib/test/test-file-writer.rb
+++ b/c_glib/test/test-file-writer.rb
@@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase
include Helper::Buildable
def test_write_record_batch
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-ipc-file-writer")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +42,12 @@ def test_write_record_batch
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-gio-input-stream.rb
b/c_glib/test/test-gio-input-stream.rb
index a71a37043..2adf25b3a 100644
--- a/c_glib/test/test-gio-input-stream.rb
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -16,15 +16,21 @@
# under the License.
class TestGIOInputStream < Test::Unit::TestCase
+ include Helper::Buildable
+
def test_reader_backend
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-gio-input-stream")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +44,12 @@ def test_reader_backend
input = Arrow::GIOInputStream.new(input_stream)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-gio-output-stream.rb
b/c_glib/test/test-gio-output-stream.rb
index adaa8c1b7..c77598ed1 100644
--- a/c_glib/test/test-gio-output-stream.rb
+++ b/c_glib/test/test-gio-output-stream.rb
@@ -16,17 +16,23 @@
# under the License.
class TestGIOOutputStream < Test::Unit::TestCase
+ include Helper::Buildable
+
def test_writer_backend
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-gio-output-stream")
file = Gio::File.new_for_path(tempfile.path)
output_stream = file.append_to(:none)
output = Arrow::GIOOutputStream.new(output_stream)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
begin
- record_batch = Arrow::RecordBatch.new(schema, 0, [])
+ record_batch = Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)])
file_writer.write_record_batch(record_batch)
ensure
file_writer.close
@@ -38,8 +44,12 @@ def test_writer_backend
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
file_reader = Arrow::RecordBatchFileReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
file_reader.schema.fields.collect(&:name))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ file_reader.read_record_batch(0))
ensure
input.close
end
diff --git a/c_glib/test/test-stream-writer.rb
b/c_glib/test/test-stream-writer.rb
index c3d0e1490..32754e208 100644
--- a/c_glib/test/test-stream-writer.rb
+++ b/c_glib/test/test-stream-writer.rb
@@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase
include Helper::Buildable
def test_write_record_batch
+ data = [true]
+ field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+ schema = Arrow::Schema.new([field])
+
tempfile = Tempfile.open("arrow-ipc-stream-writer")
output = Arrow::FileOutputStream.new(tempfile.path, false)
begin
- field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
- schema = Arrow::Schema.new([field])
stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema)
begin
columns = [
- build_boolean_array([true]),
+ build_boolean_array(data),
]
- record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+ record_batch = Arrow::RecordBatch.new(schema, data.size, columns)
stream_writer.write_record_batch(record_batch)
ensure
stream_writer.close
@@ -41,10 +43,12 @@ def test_write_record_batch
input = Arrow::MemoryMappedInputStream.new(tempfile.path)
begin
stream_reader = Arrow::RecordBatchStreamReader.new(input)
- assert_equal(["enabled"],
+ assert_equal([field.name],
stream_reader.schema.fields.collect(&:name))
- assert_equal(true,
- stream_reader.read_next.get_column(0).get_value(0))
+ assert_equal(Arrow::RecordBatch.new(schema,
+ data.size,
+ [build_boolean_array(data)]),
+ stream_reader.read_next)
assert_nil(stream_reader.read_next)
ensure
input.close
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 4998f190f..664f7ce5f 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -91,12 +91,14 @@ fi
if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake $CMAKE_COMMON_FLAGS \
$CMAKE_LINUX_FLAGS \
- -DBUILD_WARNING_LEVEL=CHECKIN \
+ -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
+ -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \
$ARROW_CPP_DIR
else
cmake $CMAKE_COMMON_FLAGS \
$CMAKE_OSX_FLAGS \
- -DBUILD_WARNING_LEVEL=CHECKIN \
+ -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
+ -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \
$ARROW_CPP_DIR
fi
diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh
index 52c7da4e0..21b6e266e 100755
--- a/ci/travis_env_common.sh
+++ b/ci/travis_env_common.sh
@@ -38,6 +38,9 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env
export CMAKE_EXPORT_COMPILE_COMMANDS=1
+export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug}
+export ARROW_BUILD_WARNING_LEVEL=${ARROW_BUILD_WARNING_LEVEL:=Production}
+
if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then
# C++ toolchain
export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 603201bcc..5f7b0a9a1 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -63,6 +63,7 @@ cmake -GNinja \
-DARROW_BUILD_UTILITIES=off \
-DARROW_PLASMA=on \
-DARROW_PYTHON=on \
+ -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
-DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
$ARROW_CPP_DIR
@@ -78,6 +79,8 @@ if [ "$PYTHON_VERSION" == "2.7" ]; then
pip install futures
fi
+export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE
+
pip install -r requirements.txt
python setup.py build_ext --with-parquet --with-plasma \
install --single-version-externally-managed --record=record.text
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 496e0da9d..94705781f 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -22,6 +22,7 @@ set(ARROW_SRCS
compare.cc
memory_pool.cc
pretty_print.cc
+ record_batch.cc
status.cc
table.cc
table_builder.cc
@@ -144,6 +145,7 @@ install(FILES
compare.h
memory_pool.h
pretty_print.h
+ record_batch.h
status.h
table.h
table_builder.h
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index 5d2e859f3..7cae8414a 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -26,6 +26,7 @@
#include "arrow/compare.h"
#include "arrow/memory_pool.h"
#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 28756a6ab..dda9dd38b 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -279,6 +279,8 @@ class ARROW_EXPORT Array {
ARROW_DISALLOW_COPY_AND_ASSIGN(Array);
};
+using ArrayVector = std::vector<std::shared_ptr<Array>>;
+
static inline std::ostream& operator<<(std::ostream& os, const Array& x) {
os << x.ToString();
return os;
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 3e213fcd5..a42f90245 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -28,7 +28,6 @@
#include "arrow/buffer.h"
#include "arrow/compare.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 32741b53a..e59e16658 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -29,7 +29,6 @@
#include "arrow/buffer.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"
diff --git a/cpp/src/arrow/column-benchmark.cc
b/cpp/src/arrow/column-benchmark.cc
index e50ddf6d7..af2c368c3 100644
--- a/cpp/src/arrow/column-benchmark.cc
+++ b/cpp/src/arrow/column-benchmark.cc
@@ -19,6 +19,7 @@
#include "arrow/array.h"
#include "arrow/memory_pool.h"
+#include "arrow/table.h"
#include "arrow/test-util.h"
namespace arrow {
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 0037245d6..e160d9c80 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -22,6 +22,7 @@
#include <vector>
#include "arrow/array.h"
+#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/util/macros.h"
#include "arrow/util/variant.h"
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 022268e03..a7262c8b4 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -27,8 +27,8 @@
#include "arrow/ipc/message.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/util/visibility.h"
#include "arrow/gpu/cuda_context.h"
diff --git a/cpp/src/arrow/ipc/feather-test.cc
b/cpp/src/arrow/ipc/feather-test.cc
index 6bd16462d..e3de17f1f 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -29,6 +29,7 @@
#include "arrow/ipc/feather.h"
#include "arrow/ipc/test-common.h"
#include "arrow/pretty_print.h"
+#include "arrow/table.h"
#include "arrow/test-util.h"
namespace arrow {
@@ -376,8 +377,8 @@ TEST_F(TestTableWriter, TimeTypes) {
schema->field(i)->type(), values->length(), buffers,
values->null_count(), 0));
}
- RecordBatch batch(schema, values->length(), std::move(arrays));
- CheckBatch(batch);
+ auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays));
+ CheckBatch(*batch);
}
TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) {
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index cea720bd0..077dc3930 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -32,6 +32,7 @@
#include "arrow/ipc/feather-internal.h"
#include "arrow/ipc/feather_generated.h"
#include "arrow/ipc/util.h" // IWYU pragma: keep
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc
b/cpp/src/arrow/ipc/ipc-json-test.cc
index a560f09d6..e496826f9 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -31,8 +31,8 @@
#include "arrow/ipc/json.h"
#include "arrow/ipc/test-common.h"
#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
@@ -269,7 +269,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
std::vector<std::shared_ptr<Array>> arrays;
MakeBatchArrays(schema, num_rows, &arrays);
- auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+ auto batch = RecordBatch::Make(schema, num_rows, arrays);
batches.push_back(batch);
ASSERT_OK(writer->WriteRecordBatch(*batch));
}
diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
index 9ed0abde6..8561fb860 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
@@ -63,7 +63,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t
total_size, int64_t num_fie
}
auto schema = std::make_shared<Schema>(fields);
- return std::make_shared<RecordBatch>(schema, length, arrays);
+ return RecordBatch::Make(schema, length, arrays);
}
static void BM_WriteRecordBatch(benchmark::State& state) { // NOLINT
non-const reference
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc
b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 40cd3f0ee..1fcbdac5e 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -197,8 +197,8 @@ class IpcTestFixture : public io::MemoryMapFixture {
std::vector<std::shared_ptr<Field>> fields = {f0};
auto schema = std::make_shared<Schema>(fields);
- RecordBatch batch(schema, 0, {array});
- CheckRoundtrip(batch, buffer_size);
+ auto batch = RecordBatch::Make(schema, 0, {array});
+ CheckRoundtrip(*batch, buffer_size);
}
protected:
@@ -292,13 +292,13 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
auto CheckArray = [this](const std::shared_ptr<Array>& array) {
auto f0 = field("f0", array->type());
auto schema = ::arrow::schema({f0});
- RecordBatch batch(schema, array->length(), {array});
- auto sliced_batch = batch.Slice(0, 5);
+ auto batch = RecordBatch::Make(schema, array->length(), {array});
+ auto sliced_batch = batch->Slice(0, 5);
int64_t full_size;
int64_t sliced_size;
- ASSERT_OK(GetRecordBatchSize(batch, &full_size));
+ ASSERT_OK(GetRecordBatchSize(*batch, &full_size));
ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size));
ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size;
@@ -411,8 +411,7 @@ class RecursionLimits : public ::testing::Test, public
io::MemoryMapFixture {
*schema = ::arrow::schema({f0});
- std::vector<std::shared_ptr<Array>> arrays = {array};
- *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
+ *batch = RecordBatch::Make(*schema, batch_length, {array});
std::stringstream ss;
ss << "test-write-past-max-recursion-" << g_file_number++;
@@ -632,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
std::vector<std::shared_ptr<Field>> fields = {f0};
auto schema = std::make_shared<Schema>(fields);
- RecordBatch batch(schema, length, {array});
+ auto batch = RecordBatch::Make(schema, length, {array});
std::string path = "test-write-large-record_batch";
@@ -641,8 +640,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_));
std::shared_ptr<RecordBatch> result;
- ASSERT_OK(DoLargeRoundTrip(batch, false, &result));
- CheckReadResult(*result, batch);
+ ASSERT_OK(DoLargeRoundTrip(*batch, false, &result));
+ CheckReadResult(*result, *batch);
ASSERT_EQ(length, result->num_rows());
}
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc
b/cpp/src/arrow/ipc/json-integration-test.cc
index c7530a467..f487487df 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -34,8 +34,8 @@
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
diff --git a/cpp/src/arrow/ipc/json-internal.cc
b/cpp/src/arrow/ipc/json-internal.cc
index bdf1ef52b..bfb3d282d 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -28,8 +28,8 @@
#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/ipc/dictionary.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"
@@ -125,8 +125,8 @@ class SchemaWriter {
// Make a dummy record batch. A bit tedious as we have to make a schema
auto schema = ::arrow::schema({arrow::field("dictionary",
dictionary->type())});
- RecordBatch batch(schema, dictionary->length(), {dictionary});
- RETURN_NOT_OK(WriteRecordBatch(batch, writer_));
+ auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
+ RETURN_NOT_OK(WriteRecordBatch(*batch, writer_));
writer_->EndObject();
return Status::OK();
}
@@ -1435,7 +1435,7 @@ Status ReadRecordBatch(const rj::Value& json_obj, const
std::shared_ptr<Schema>&
RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i]));
}
- *batch = std::make_shared<RecordBatch>(schema, num_rows, columns);
+ *batch = RecordBatch::Make(schema, num_rows, columns);
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index 30a1bb81e..ea2947d5d 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -24,8 +24,8 @@
#include "arrow/buffer.h"
#include "arrow/ipc/json-internal.h"
#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8e10d7d66..5960e8188 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -37,8 +37,8 @@
#include "arrow/ipc/message.h"
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/util.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/tensor.h"
#include "arrow/type.h"
#include "arrow/util/bit-util.h"
@@ -307,7 +307,7 @@ static Status LoadRecordBatchFromSource(const
std::shared_ptr<Schema>& schema,
arrays[i] = std::move(arr);
}
- *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays));
+ *out = RecordBatch::Make(schema, num_rows, std::move(arrays));
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 7581fbda5..627f67e25 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -24,13 +24,12 @@
#include <memory>
#include "arrow/ipc/message.h"
-#include "arrow/table.h"
+#include "arrow/record_batch.h"
#include "arrow/util/visibility.h"
namespace arrow {
class Buffer;
-class RecordBatch;
class Schema;
class Status;
class Tensor;
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 7fc139381..6f8a0dcc6 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -30,8 +30,8 @@
#include "arrow/builder.h"
#include "arrow/memory_pool.h"
#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
#include "arrow/util/bit-util.h"
@@ -184,7 +184,7 @@ Status MakeBooleanBatchSized(const int length,
std::shared_ptr<RecordBatch>* out
std::shared_ptr<Array> a0, a1;
RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0));
RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1));
- out->reset(new RecordBatch(schema, length, {a0, a1}));
+ *out = RecordBatch::Make(schema, length, {a0, a1});
return Status::OK();
}
@@ -203,7 +203,7 @@ Status MakeIntBatchSized(int length,
std::shared_ptr<RecordBatch>* out) {
MemoryPool* pool = default_memory_pool();
RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
- out->reset(new RecordBatch(schema, length, {a0, a1}));
+ *out = RecordBatch::Make(schema, length, {a0, a1});
return Status::OK();
}
@@ -252,7 +252,7 @@ Status
MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, true, pool,
&a1);
RETURN_NOT_OK(s);
}
- out->reset(new RecordBatch(schema, length, {a0, a1}));
+ *out = RecordBatch::Make(schema, length, {a0, a1});
return Status::OK();
}
@@ -261,7 +261,7 @@ Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>*
out) {
auto f0 = field("f0", null());
auto schema = ::arrow::schema({f0});
std::shared_ptr<Array> a0 = std::make_shared<NullArray>(length);
- out->reset(new RecordBatch(schema, length, {a0}));
+ *out = RecordBatch::Make(schema, length, {a0});
return Status::OK();
}
@@ -284,7 +284,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>*
out) {
RETURN_NOT_OK(
MakeRandomListArray(list_array, length, include_nulls, pool,
&list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool,
&flat_array));
- out->reset(new RecordBatch(schema, length, {list_array, list_list_array,
flat_array}));
+ *out = RecordBatch::Make(schema, length, {list_array, list_list_array,
flat_array});
return Status::OK();
}
@@ -304,7 +304,7 @@ Status
MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(
MakeRandomListArray(list_array, 0, include_nulls, pool,
&list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
- out->reset(new RecordBatch(schema, 0, {list_array, list_list_array,
flat_array}));
+ *out = RecordBatch::Make(schema, 0, {list_array, list_list_array,
flat_array});
return Status::OK();
}
@@ -327,7 +327,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>*
out) {
RETURN_NOT_OK(
MakeRandomListArray(list_array, length, include_nulls, pool,
&list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool,
&flat_array));
- out->reset(new RecordBatch(schema, length, {list_array, list_list_array,
flat_array}));
+ *out = RecordBatch::Make(schema, length, {list_array, list_list_array,
flat_array});
return Status::OK();
}
@@ -347,7 +347,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>*
out) {
auto f0 = field("f0", type);
auto schema = ::arrow::schema({f0});
std::vector<std::shared_ptr<Array>> arrays = {array};
- out->reset(new RecordBatch(schema, batch_length, arrays));
+ *out = RecordBatch::Make(schema, batch_length, arrays);
return Status::OK();
}
@@ -377,7 +377,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
// construct batch
std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls};
- out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays));
+ *out = RecordBatch::Make(schema, list_batch->num_rows(), arrays);
return Status::OK();
}
@@ -445,7 +445,7 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
// construct batch
std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse,
dense};
- out->reset(new RecordBatch(schema, length, arrays));
+ *out = RecordBatch::Make(schema, length, arrays);
return Status::OK();
}
@@ -526,7 +526,7 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4};
- out->reset(new RecordBatch(schema, length, arrays));
+ *out = RecordBatch::Make(schema, length, arrays);
return Status::OK();
}
@@ -564,7 +564,7 @@ Status MakeDictionaryFlat(std::shared_ptr<RecordBatch>*
out) {
{field("dict1", f0_type), field("sparse", f1_type), field("dense",
f2_type)});
std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2};
- out->reset(new RecordBatch(schema, length, arrays));
+ *out = RecordBatch::Make(schema, length, arrays);
return Status::OK();
}
@@ -584,8 +584,7 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Array> date64_array;
ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array);
- std::vector<std::shared_ptr<Array>> arrays = {date32_array, date64_array};
- *out = std::make_shared<RecordBatch>(schema, date32_array->length(), arrays);
+ *out = RecordBatch::Make(schema, date32_array->length(), {date32_array,
date64_array});
return Status::OK();
}
@@ -604,8 +603,7 @@ Status MakeTimestamps(std::shared_ptr<RecordBatch>* out) {
ArrayFromVector<TimestampType, int64_t>(f1->type(), is_valid, ts_values,
&a1);
ArrayFromVector<TimestampType, int64_t>(f2->type(), is_valid, ts_values,
&a2);
- ArrayVector arrays = {a0, a1, a2};
- *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays);
+ *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2});
return Status::OK();
}
@@ -628,8 +626,7 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) {
ArrayFromVector<Time32Type, int32_t>(f2->type(), is_valid, t32_values, &a2);
ArrayFromVector<Time64Type, int64_t>(f3->type(), is_valid, t64_values, &a3);
- ArrayVector arrays = {a0, a1, a2, a3};
- *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays);
+ *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2, a3});
return Status::OK();
}
@@ -665,8 +662,7 @@ Status MakeFWBinary(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(b1.Finish(&a1));
RETURN_NOT_OK(b2.Finish(&a2));
- ArrayVector arrays = {a1, a2};
- *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays);
+ *out = RecordBatch::Make(schema, a1->length(), {a1, a2});
return Status::OK();
}
@@ -695,8 +691,7 @@ Status MakeDecimal(std::shared_ptr<RecordBatch>* out) {
auto a2 = std::make_shared<Decimal128Array>(f1->type(), length, data);
- ArrayVector arrays = {a1, a2};
- *out = std::make_shared<RecordBatch>(schema, length, arrays);
+ *out = RecordBatch::Make(schema, length, {a1, a2});
return Status::OK();
}
@@ -716,8 +711,7 @@ Status MakeNull(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Array> a2;
ArrayFromVector<Int64Type, int64_t>(f1->type(), is_valid, int_values, &a2);
- ArrayVector arrays = {a1, a2};
- *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays);
+ *out = RecordBatch::Make(schema, a1->length(), {a1, a2});
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 323116f58..3c1db0615 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -32,6 +32,7 @@
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/util.h"
#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/tensor.h"
@@ -508,12 +509,9 @@ class DictionaryWriter : public RecordBatchSerializer {
dictionary_id_ = dictionary_id;
// Make a dummy record batch. A bit tedious as we have to make a schema
- std::vector<std::shared_ptr<Field>> fields = {
- arrow::field("dictionary", dictionary->type())};
- auto schema = std::make_shared<Schema>(fields);
- RecordBatch batch(schema, dictionary->length(), {dictionary});
-
- return RecordBatchSerializer::Write(batch, dst, metadata_length,
body_length);
+ auto schema = arrow::schema({arrow::field("dictionary",
dictionary->type())});
+ auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
+ return RecordBatchSerializer::Write(*batch, dst, metadata_length,
body_length);
}
private:
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index cfbc30315..bd5f8ce10 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -22,8 +22,8 @@
#include "arrow/array.h"
#include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/logging.h"
diff --git a/cpp/src/arrow/python/python-test.cc
b/cpp/src/arrow/python/python-test.cc
index 86391a185..3b7d7d884 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -23,6 +23,7 @@
#include "arrow/array.h"
#include "arrow/builder.h"
+#include "arrow/table.h"
#include "arrow/test-util.h"
#include "arrow/python/arrow_to_pandas.h"
@@ -81,8 +82,8 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
std::vector<std::shared_ptr<Field>> fields = {f1, f2, f3};
std::vector<std::shared_ptr<Array>> cols = {arr, arr, arr};
- auto schema = std::make_shared<Schema>(fields);
- auto table = std::make_shared<Table>(schema, cols);
+ auto schema = ::arrow::schema(fields);
+ auto table = Table::Make(schema, cols);
PyObject* out;
Py_BEGIN_ALLOW_THREADS;
diff --git a/cpp/src/arrow/python/python_to_arrow.cc
b/cpp/src/arrow/python/python_to_arrow.cc
index b0c6287f0..72cc5b6e1 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -32,13 +32,15 @@
#include "arrow/builder.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+#include "arrow/tensor.h"
+#include "arrow/util/logging.h"
+
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/platform.h"
#include "arrow/python/util/datetime.h"
-#include "arrow/tensor.h"
-#include "arrow/util/logging.h"
constexpr int32_t kMaxRecursionDepth = 100;
@@ -694,7 +696,7 @@ Status SerializeDict(PyObject* context,
std::vector<PyObject*> dicts,
std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
auto field = std::make_shared<Field>("list", data->type());
auto schema = ::arrow::schema({field});
- return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(),
{data}));
+ return RecordBatch::Make(schema, data->length(), {data});
}
Status SerializeObject(PyObject* context, PyObject* sequence,
SerializedPyObject* out) {
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
new file mode 100644
index 000000000..60932bdf3
--- /dev/null
+++ b/cpp/src/arrow/record_batch.cc
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/record_batch.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <memory>
+#include <sstream>
+
+#include "arrow/array.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+/// \class SimpleRecordBatch
+/// \brief A basic, non-lazy in-memory record batch
+class SimpleRecordBatch : public RecordBatch {
+ public:
+ SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<Array>>& columns)
+ : RecordBatch(schema, num_rows) {
+ columns_.resize(columns.size());
+ boxed_columns_.resize(schema->num_fields());
+ for (size_t i = 0; i < columns.size(); ++i) {
+ columns_[i] = columns[i]->data();
+ }
+ }
+
+ SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ std::vector<std::shared_ptr<Array>>&& columns)
+ : RecordBatch(schema, num_rows) {
+ columns_.resize(columns.size());
+ boxed_columns_.resize(schema->num_fields());
+ for (size_t i = 0; i < columns.size(); ++i) {
+ columns_[i] = columns[i]->data();
+ }
+ }
+
+ SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ std::vector<std::shared_ptr<ArrayData>>&& columns)
+ : RecordBatch(schema, num_rows) {
+ columns_ = std::move(columns);
+ boxed_columns_.resize(schema->num_fields());
+ }
+
+ SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<ArrayData>>& columns)
+ : RecordBatch(schema, num_rows) {
+ columns_ = columns;
+ boxed_columns_.resize(schema->num_fields());
+ }
+
+ std::shared_ptr<Array> column(int i) const override {
+ if (!boxed_columns_[i]) {
+ boxed_columns_[i] = MakeArray(columns_[i]);
+ }
+ DCHECK(boxed_columns_[i]);
+ return boxed_columns_[i];
+ }
+
+ std::shared_ptr<ArrayData> column_data(int i) const override { return
columns_[i]; }
+
+ std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
+ auto new_schema = schema_->AddMetadata(metadata);
+ return RecordBatch::Make(new_schema, num_rows_, columns_);
+ }
+
+ std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const
override {
+ std::vector<std::shared_ptr<ArrayData>> arrays;
+ arrays.reserve(num_columns());
+ for (const auto& field : columns_) {
+ int64_t col_length = std::min(field->length - offset, length);
+ int64_t col_offset = field->offset + offset;
+
+ auto new_data = std::make_shared<ArrayData>(*field);
+ new_data->length = col_length;
+ new_data->offset = col_offset;
+ new_data->null_count = kUnknownNullCount;
+ arrays.emplace_back(new_data);
+ }
+ int64_t num_rows = std::min(num_rows_ - offset, length);
+ return std::make_shared<SimpleRecordBatch>(schema_, num_rows,
std::move(arrays));
+ }
+
+ Status Validate() const override {
+ if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
+ return Status::Invalid("Number of columns did not match schema");
+ }
+ return RecordBatch::Validate();
+ }
+
+ private:
+ std::vector<std::shared_ptr<ArrayData>> columns_;
+
+ // Caching boxed array data
+ mutable std::vector<std::shared_ptr<Array>> boxed_columns_;
+};
+
+RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows)
+ : schema_(schema), num_rows_(num_rows) {}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<Array>>& columns) {
+ return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns);
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ std::vector<std::shared_ptr<Array>>&& columns) {
+ return std::make_shared<SimpleRecordBatch>(schema, num_rows,
std::move(columns));
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ std::vector<std::shared_ptr<ArrayData>>&& columns) {
+ return std::make_shared<SimpleRecordBatch>(schema, num_rows,
std::move(columns));
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<ArrayData>>& columns) {
+ return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns);
+}
+
+const std::string& RecordBatch::column_name(int i) const {
+ return schema_->field(i)->name();
+}
+
+bool RecordBatch::Equals(const RecordBatch& other) const {
+ if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+ return false;
+ }
+
+ for (int i = 0; i < num_columns(); ++i) {
+ if (!column(i)->Equals(other.column(i))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
+ if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+ return false;
+ }
+
+ for (int i = 0; i < num_columns(); ++i) {
+ if (!column(i)->ApproxEquals(other.column(i))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
+ return Slice(offset, this->num_rows() - offset);
+}
+
+Status RecordBatch::Validate() const {
+ for (int i = 0; i < num_columns(); ++i) {
+ auto arr_shared = this->column_data(i);
+ const ArrayData& arr = *arr_shared;
+ if (arr.length != num_rows_) {
+ std::stringstream ss;
+ ss << "Number of rows in column " << i << " did not match batch: " <<
arr.length
+ << " vs " << num_rows_;
+ return Status::Invalid(ss.str());
+ }
+ const auto& schema_type = *schema_->field(i)->type();
+ if (!arr.type->Equals(schema_type)) {
+ std::stringstream ss;
+ ss << "Column " << i << " type not match schema: " <<
arr.type->ToString() << " vs "
+ << schema_type.ToString();
+ return Status::Invalid(ss.str());
+ }
+ }
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Base record batch reader
+
+RecordBatchReader::~RecordBatchReader() {}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
new file mode 100644
index 000000000..b2c4c76b3
--- /dev/null
+++ b/cpp/src/arrow/record_batch.h
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_RECORD_BATCH_H
+#define ARROW_RECORD_BATCH_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class KeyValueMetadata;
+class Status;
+
+/// \class RecordBatch
+/// \brief Collection of equal-length arrays matching a particular Schema
+///
+/// A record batch is table-like data structure that is semantically a sequence
+/// of fields, each a contiguous Arrow array
+class ARROW_EXPORT RecordBatch {
+ public:
+ virtual ~RecordBatch() = default;
+
+ /// \param[in] schema The record batch schema
+ /// \param[in] num_rows length of fields in the record batch. Each array
+ /// should have the same length as num_rows
+ /// \param[in] columns the record batch fields as vector of arrays
+ static std::shared_ptr<RecordBatch> Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<Array>>& columns);
+
+ /// \brief Move-based constructor for a vector of Array instances
+ static std::shared_ptr<RecordBatch> Make(const std::shared_ptr<Schema>&
schema,
+ int64_t num_rows,
+
std::vector<std::shared_ptr<Array>>&& columns);
+
+ /// \brief Construct record batch from vector of internal data structures
+ /// \since 0.5.0
+ ///
+ /// This class is only provided with an rvalue-reference for the input data,
+ /// and is intended for internal use, or advanced users.
+ ///
+ /// \param schema the record batch schema
+ /// \param num_rows the number of semantic rows in the record batch. This
+ /// should be equal to the length of each field
+ /// \param columns the data for the batch's columns
+ static std::shared_ptr<RecordBatch> Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ std::vector<std::shared_ptr<ArrayData>>&& columns);
+
+ /// \brief Construct record batch by copying vector of array data
+ /// \since 0.5.0
+ static std::shared_ptr<RecordBatch> Make(
+ const std::shared_ptr<Schema>& schema, int64_t num_rows,
+ const std::vector<std::shared_ptr<ArrayData>>& columns);
+
+ /// \brief Determine if two record batches are exactly equal
+ /// \return true if batches are equal
+ bool Equals(const RecordBatch& other) const;
+
+ /// \brief Determine if two record batches are approximately equal
+ bool ApproxEquals(const RecordBatch& other) const;
+
+ // \return the table's schema
+ /// \return true if batches are equal
+ std::shared_ptr<Schema> schema() const { return schema_; }
+
+ /// \brief Retrieve an array from the record batch
+ /// \param[in] i field index, does not boundscheck
+ /// \return an Array object
+ virtual std::shared_ptr<Array> column(int i) const = 0;
+
+ /// \brief Retrieve an array's internaldata from the record batch
+ /// \param[in] i field index, does not boundscheck
+ /// \return an internal ArrayData object
+ virtual std::shared_ptr<ArrayData> column_data(int i) const = 0;
+
+ virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
+
+ /// \brief Name in i-th column
+ const std::string& column_name(int i) const;
+
+ /// \return the number of columns in the table
+ int num_columns() const { return schema_->num_fields(); }
+
+ /// \return the number of rows (the corresponding length of each column)
+ int64_t num_rows() const { return num_rows_; }
+
+ /// \brief Slice each of the arrays in the record batch
+ /// \param[in] offset the starting offset to slice, through end of batch
+ /// \return new record batch
+ virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
+
+ /// \brief Slice each of the arrays in the record batch
+ /// \param[in] offset the starting offset to slice
+ /// \param[in] length the number of elements to slice from offset
+ /// \return new record batch
+ virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length)
const = 0;
+
+ /// \brief Check for schema or length inconsistencies
+ /// \return Status
+ virtual Status Validate() const;
+
+ protected:
+ RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
+
+ std::shared_ptr<Schema> schema_;
+ int64_t num_rows_;
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
+};
+
+/// \brief Abstract interface for reading stream of record batches
+class ARROW_EXPORT RecordBatchReader {
+ public:
+ virtual ~RecordBatchReader();
+
+ /// \return the shared schema of the record batches in the stream
+ virtual std::shared_ptr<Schema> schema() const = 0;
+
+ /// Read the next record batch in the stream. Return null for batch when
+ /// reaching end of stream
+ ///
+ /// \param[out] batch the next loaded batch, null at end of stream
+ /// \return Status
+ virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+};
+
+} // namespace arrow
+
+#endif // ARROW_RECORD_BATCH_H
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index b490310c2..e77d3aa8b 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -22,6 +22,7 @@
#include "gtest/gtest.h"
#include "arrow/array.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/test-common.h"
@@ -216,8 +217,8 @@ class TestTable : public TestBase {
TEST_F(TestTable, EmptySchema) {
auto empty_schema = ::arrow::schema({});
- table_.reset(new Table(empty_schema, columns_));
- ASSERT_OK(table_->ValidateColumns());
+ table_ = Table::Make(empty_schema, columns_);
+ ASSERT_OK(table_->Validate());
ASSERT_EQ(0, table_->num_rows());
ASSERT_EQ(0, table_->num_columns());
}
@@ -226,20 +227,20 @@ TEST_F(TestTable, Ctors) {
const int length = 100;
MakeExample1(length);
- table_.reset(new Table(schema_, columns_));
- ASSERT_OK(table_->ValidateColumns());
+ table_ = Table::Make(schema_, columns_);
+ ASSERT_OK(table_->Validate());
ASSERT_EQ(length, table_->num_rows());
ASSERT_EQ(3, table_->num_columns());
- auto array_ctor = std::make_shared<Table>(schema_, arrays_);
+ auto array_ctor = Table::Make(schema_, arrays_);
ASSERT_TRUE(table_->Equals(*array_ctor));
- table_.reset(new Table(schema_, columns_, length));
- ASSERT_OK(table_->ValidateColumns());
+ table_ = Table::Make(schema_, columns_, length);
+ ASSERT_OK(table_->Validate());
ASSERT_EQ(length, table_->num_rows());
- ASSERT_OK(MakeTable(schema_, arrays_, &table_));
- ASSERT_OK(table_->ValidateColumns());
+ table_ = Table::Make(schema_, arrays_);
+ ASSERT_OK(table_->Validate());
ASSERT_EQ(length, table_->num_rows());
ASSERT_EQ(3, table_->num_columns());
}
@@ -248,7 +249,7 @@ TEST_F(TestTable, Metadata) {
const int length = 100;
MakeExample1(length);
- table_.reset(new Table(schema_, columns_));
+ table_ = Table::Make(schema_, columns_);
ASSERT_TRUE(table_->schema()->Equals(*schema_));
@@ -262,14 +263,14 @@ TEST_F(TestTable, InvalidColumns) {
const int length = 100;
MakeExample1(length);
- table_.reset(new Table(schema_, columns_, length - 1));
- ASSERT_RAISES(Invalid, table_->ValidateColumns());
+ table_ = Table::Make(schema_, columns_, length - 1);
+ ASSERT_RAISES(Invalid, table_->Validate());
columns_.clear();
// Wrong number of columns
- table_.reset(new Table(schema_, columns_, length));
- ASSERT_RAISES(Invalid, table_->ValidateColumns());
+ table_ = Table::Make(schema_, columns_, length);
+ ASSERT_RAISES(Invalid, table_->Validate());
columns_ = {
std::make_shared<Column>(schema_->field(0),
MakeRandomArray<Int32Array>(length)),
@@ -277,15 +278,15 @@ TEST_F(TestTable, InvalidColumns) {
std::make_shared<Column>(schema_->field(2),
MakeRandomArray<Int16Array>(length - 1))};
- table_.reset(new Table(schema_, columns_, length));
- ASSERT_RAISES(Invalid, table_->ValidateColumns());
+ table_ = Table::Make(schema_, columns_, length);
+ ASSERT_RAISES(Invalid, table_->Validate());
}
TEST_F(TestTable, Equals) {
const int length = 100;
MakeExample1(length);
- table_.reset(new Table(schema_, columns_));
+ table_ = Table::Make(schema_, columns_);
ASSERT_TRUE(table_->Equals(*table_));
// Differing schema
@@ -294,7 +295,8 @@ TEST_F(TestTable, Equals) {
auto f2 = field("f5", int16());
vector<shared_ptr<Field>> fields = {f0, f1, f2};
auto other_schema = std::make_shared<Schema>(fields);
- ASSERT_FALSE(table_->Equals(Table(other_schema, columns_)));
+ auto other = Table::Make(other_schema, columns_);
+ ASSERT_FALSE(table_->Equals(*other));
// Differing columns
std::vector<std::shared_ptr<Column>> other_columns = {
std::make_shared<Column>(schema_->field(0),
@@ -303,19 +305,21 @@ TEST_F(TestTable, Equals) {
MakeRandomArray<UInt8Array>(length, 10)),
std::make_shared<Column>(schema_->field(2),
MakeRandomArray<Int16Array>(length, 10))};
- ASSERT_FALSE(table_->Equals(Table(schema_, other_columns)));
+
+ other = Table::Make(schema_, other_columns);
+ ASSERT_FALSE(table_->Equals(*other));
}
TEST_F(TestTable, FromRecordBatches) {
const int64_t length = 10;
MakeExample1(length);
- auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+ auto batch1 = RecordBatch::Make(schema_, length, arrays_);
std::shared_ptr<Table> result, expected;
ASSERT_OK(Table::FromRecordBatches({batch1}, &result));
- expected = std::make_shared<Table>(schema_, columns_);
+ expected = Table::Make(schema_, columns_);
ASSERT_TRUE(result->Equals(*expected));
std::vector<std::shared_ptr<Column>> other_columns;
@@ -325,18 +329,17 @@ TEST_F(TestTable, FromRecordBatches) {
}
ASSERT_OK(Table::FromRecordBatches({batch1, batch1}, &result));
- expected = std::make_shared<Table>(schema_, other_columns);
+ expected = Table::Make(schema_, other_columns);
ASSERT_TRUE(result->Equals(*expected));
// Error states
std::vector<std::shared_ptr<RecordBatch>> empty_batches;
ASSERT_RAISES(Invalid, Table::FromRecordBatches(empty_batches, &result));
- std::vector<std::shared_ptr<Field>> fields = {schema_->field(0),
schema_->field(1)};
- auto other_schema = std::make_shared<Schema>(fields);
+ auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
- auto batch2 = std::make_shared<RecordBatch>(other_schema, length,
other_arrays);
+ auto batch2 = RecordBatch::Make(other_schema, length, other_arrays);
ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result));
}
@@ -344,11 +347,11 @@ TEST_F(TestTable, ConcatenateTables) {
const int64_t length = 10;
MakeExample1(length);
- auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+ auto batch1 = RecordBatch::Make(schema_, length, arrays_);
// generate different data
MakeExample1(length);
- auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+ auto batch2 = RecordBatch::Make(schema_, length, arrays_);
std::shared_ptr<Table> t1, t2, t3, result, expected;
ASSERT_OK(Table::FromRecordBatches({batch1}, &t1));
@@ -362,11 +365,10 @@ TEST_F(TestTable, ConcatenateTables) {
std::vector<std::shared_ptr<Table>> empty_tables;
ASSERT_RAISES(Invalid, ConcatenateTables(empty_tables, &result));
- std::vector<std::shared_ptr<Field>> fields = {schema_->field(0),
schema_->field(1)};
- auto other_schema = std::make_shared<Schema>(fields);
+ auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
- auto batch3 = std::make_shared<RecordBatch>(other_schema, length,
other_arrays);
+ auto batch3 = RecordBatch::Make(other_schema, length, other_arrays);
ASSERT_OK(Table::FromRecordBatches({batch3}, &t3));
ASSERT_RAISES(Invalid, ConcatenateTables({t1, t3}, &result));
@@ -376,31 +378,38 @@ TEST_F(TestTable, RemoveColumn) {
const int64_t length = 10;
MakeExample1(length);
- Table table(schema_, columns_);
+ auto table_sp = Table::Make(schema_, columns_);
+ const Table& table = *table_sp;
std::shared_ptr<Table> result;
ASSERT_OK(table.RemoveColumn(0, &result));
auto ex_schema = ::arrow::schema({schema_->field(1), schema_->field(2)});
std::vector<std::shared_ptr<Column>> ex_columns = {table.column(1),
table.column(2)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+ auto expected = Table::Make(ex_schema, ex_columns);
+ ASSERT_TRUE(result->Equals(*expected));
ASSERT_OK(table.RemoveColumn(1, &result));
ex_schema = ::arrow::schema({schema_->field(0), schema_->field(2)});
ex_columns = {table.column(0), table.column(2)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+ expected = Table::Make(ex_schema, ex_columns);
+ ASSERT_TRUE(result->Equals(*expected));
ASSERT_OK(table.RemoveColumn(2, &result));
ex_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
ex_columns = {table.column(0), table.column(1)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+ expected = Table::Make(ex_schema, ex_columns);
+ ASSERT_TRUE(result->Equals(*expected));
}
TEST_F(TestTable, AddColumn) {
const int64_t length = 10;
MakeExample1(length);
- Table table(schema_, columns_);
+ auto table_sp = Table::Make(schema_, columns_);
+ const Table& table = *table_sp;
std::shared_ptr<Table> result;
// Some negative tests with invalid index
@@ -419,50 +428,32 @@ TEST_F(TestTable, AddColumn) {
ASSERT_OK(table.AddColumn(0, columns_[0], &result));
auto ex_schema = ::arrow::schema(
{schema_->field(0), schema_->field(0), schema_->field(1),
schema_->field(2)});
- std::vector<std::shared_ptr<Column>> ex_columns = {table.column(0),
table.column(0),
- table.column(1),
table.column(2)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+ auto expected = Table::Make(
+ ex_schema, {table.column(0), table.column(0), table.column(1),
table.column(2)});
+ ASSERT_TRUE(result->Equals(*expected));
ASSERT_OK(table.AddColumn(1, columns_[0], &result));
ex_schema = ::arrow::schema(
{schema_->field(0), schema_->field(0), schema_->field(1),
schema_->field(2)});
- ex_columns = {table.column(0), table.column(0), table.column(1),
table.column(2)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+ expected = Table::Make(
+ ex_schema, {table.column(0), table.column(0), table.column(1),
table.column(2)});
+ ASSERT_TRUE(result->Equals(*expected));
ASSERT_OK(table.AddColumn(2, columns_[0], &result));
ex_schema = ::arrow::schema(
{schema_->field(0), schema_->field(1), schema_->field(0),
schema_->field(2)});
- ex_columns = {table.column(0), table.column(1), table.column(0),
table.column(2)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+ expected = Table::Make(
+ ex_schema, {table.column(0), table.column(1), table.column(0),
table.column(2)});
+ ASSERT_TRUE(result->Equals(*expected));
ASSERT_OK(table.AddColumn(3, columns_[0], &result));
ex_schema = ::arrow::schema(
{schema_->field(0), schema_->field(1), schema_->field(2),
schema_->field(0)});
- ex_columns = {table.column(0), table.column(1), table.column(2),
table.column(0)};
- ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
-}
-
-TEST_F(TestTable, IsChunked) {
- ArrayVector c1, c2;
-
- auto a1 = MakeRandomArray<Int32Array>(10);
- auto a2 = MakeRandomArray<Int32Array>(20);
-
- auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())});
-
- std::vector<std::shared_ptr<Column>> columns;
-
- std::shared_ptr<RecordBatch> batch;
-
- columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})};
- auto t1 = std::make_shared<Table>(sch1, columns);
-
- ASSERT_FALSE(t1->IsChunked());
-
- columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})};
- auto t2 = std::make_shared<Table>(sch1, columns);
-
- ASSERT_TRUE(t2->IsChunked());
+ expected = Table::Make(
+ ex_schema, {table.column(0), table.column(1), table.column(2),
table.column(0)});
+ ASSERT_TRUE(result->Equals(*expected));
}
class TestRecordBatch : public TestBase {};
@@ -475,24 +466,22 @@ TEST_F(TestRecordBatch, Equals) {
auto f2 = field("f2", int16());
vector<shared_ptr<Field>> fields = {f0, f1, f2};
- auto schema = std::make_shared<Schema>(fields);
+ auto schema = ::arrow::schema({f0, f1, f2});
+ auto schema2 = ::arrow::schema({f0, f1});
auto a0 = MakeRandomArray<Int32Array>(length);
auto a1 = MakeRandomArray<UInt8Array>(length);
auto a2 = MakeRandomArray<Int16Array>(length);
- RecordBatch b1(schema, length, {a0, a1, a2});
- RecordBatch b3(schema, length, {a0, a1});
- RecordBatch b4(schema, length, {a0, a1, a1});
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+ auto b3 = RecordBatch::Make(schema2, length, {a0, a1});
+ auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1});
- ASSERT_TRUE(b1.Equals(b1));
- ASSERT_FALSE(b1.Equals(b3));
- ASSERT_FALSE(b1.Equals(b4));
+ ASSERT_TRUE(b1->Equals(*b1));
+ ASSERT_FALSE(b1->Equals(*b3));
+ ASSERT_FALSE(b1->Equals(*b4));
}
-#ifdef NDEBUG
-// In debug builds, RecordBatch ctor aborts if you construct an invalid one
-
TEST_F(TestRecordBatch, Validate) {
const int length = 10;
@@ -507,21 +496,19 @@ TEST_F(TestRecordBatch, Validate) {
auto a2 = MakeRandomArray<Int16Array>(length);
auto a3 = MakeRandomArray<Int16Array>(5);
- RecordBatch b1(schema, length, {a0, a1, a2});
+ auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
- ASSERT_OK(b1.Validate());
+ ASSERT_OK(b1->Validate());
// Length mismatch
- RecordBatch b2(schema, length, {a0, a1, a3});
- ASSERT_RAISES(Invalid, b2.Validate());
+ auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3});
+ ASSERT_RAISES(Invalid, b2->Validate());
// Type mismatch
- RecordBatch b3(schema, length, {a0, a1, a0});
- ASSERT_RAISES(Invalid, b3.Validate());
+ auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0});
+ ASSERT_RAISES(Invalid, b3->Validate());
}
-#endif
-
TEST_F(TestRecordBatch, Slice) {
const int length = 10;
@@ -529,19 +516,19 @@ TEST_F(TestRecordBatch, Slice) {
auto f1 = field("f1", uint8());
vector<shared_ptr<Field>> fields = {f0, f1};
- auto schema = std::make_shared<Schema>(fields);
+ auto schema = ::arrow::schema(fields);
auto a0 = MakeRandomArray<Int32Array>(length);
auto a1 = MakeRandomArray<UInt8Array>(length);
- RecordBatch batch(schema, length, {a0, a1});
+ auto batch = RecordBatch::Make(schema, length, {a0, a1});
- auto batch_slice = batch.Slice(2);
- auto batch_slice2 = batch.Slice(1, 5);
+ auto batch_slice = batch->Slice(2);
+ auto batch_slice2 = batch->Slice(1, 5);
- ASSERT_EQ(batch_slice->num_rows(), batch.num_rows() - 2);
+ ASSERT_EQ(batch_slice->num_rows(), batch->num_rows() - 2);
- for (int i = 0; i < batch.num_columns(); ++i) {
+ for (int i = 0; i < batch->num_columns(); ++i) {
ASSERT_EQ(2, batch_slice->column(i)->offset());
ASSERT_EQ(length - 2, batch_slice->column(i)->length());
@@ -567,9 +554,9 @@ TEST_F(TestTableBatchReader, ReadNext) {
std::shared_ptr<RecordBatch> batch;
columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2,
a2})};
- Table t1(sch1, columns);
+ auto t1 = Table::Make(sch1, columns);
- TableBatchReader i1(t1);
+ TableBatchReader i1(*t1);
ASSERT_OK(i1.ReadNext(&batch));
ASSERT_EQ(10, batch->num_rows());
@@ -584,9 +571,9 @@ TEST_F(TestTableBatchReader, ReadNext) {
ASSERT_EQ(nullptr, batch);
columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})};
- Table t2(sch1, columns);
+ auto t2 = Table::Make(sch1, columns);
- TableBatchReader i2(t2);
+ TableBatchReader i2(*t2);
ASSERT_OK(i2.ReadNext(&batch));
ASSERT_EQ(10, batch->num_rows());
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index fe19bf4ce..8f3f19576 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -23,6 +23,7 @@
#include <sstream>
#include "arrow/array.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"
@@ -153,171 +154,126 @@ Status Column::ValidateData() {
}
// ----------------------------------------------------------------------
-// RecordBatch methods
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows)
- : schema_(schema), num_rows_(num_rows) {
- boxed_columns_.resize(schema->num_fields());
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows,
- const std::vector<std::shared_ptr<Array>>& columns)
- : RecordBatch(schema, num_rows) {
- columns_.resize(columns.size());
- for (size_t i = 0; i < columns.size(); ++i) {
- columns_[i] = columns[i]->data();
- }
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows,
- std::vector<std::shared_ptr<Array>>&& columns)
- : RecordBatch(schema, num_rows) {
- columns_.resize(columns.size());
- for (size_t i = 0; i < columns.size(); ++i) {
- columns_[i] = columns[i]->data();
- }
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows,
- std::vector<std::shared_ptr<ArrayData>>&& columns)
- : RecordBatch(schema, num_rows) {
- columns_ = std::move(columns);
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t
num_rows,
- const std::vector<std::shared_ptr<ArrayData>>&
columns)
- : RecordBatch(schema, num_rows) {
- columns_ = columns;
-}
-
-std::shared_ptr<Array> RecordBatch::column(int i) const {
- if (!boxed_columns_[i]) {
- boxed_columns_[i] = MakeArray(columns_[i]);
- }
- DCHECK(boxed_columns_[i]);
- return boxed_columns_[i];
-}
-
-const std::string& RecordBatch::column_name(int i) const {
- return schema_->field(i)->name();
-}
-
-bool RecordBatch::Equals(const RecordBatch& other) const {
- if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
- return false;
- }
+// Table methods
- for (int i = 0; i < num_columns(); ++i) {
- if (!column(i)->Equals(other.column(i))) {
- return false;
+/// \class SimpleTable
+/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
+class SimpleTable : public Table {
+ public:
+ SimpleTable(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<Column>>& columns, int64_t
num_rows = -1)
+ : columns_(columns) {
+ schema_ = schema;
+ if (num_rows < 0) {
+ if (columns.size() == 0) {
+ num_rows_ = 0;
+ } else {
+ num_rows_ = columns[0]->length();
+ }
+ } else {
+ num_rows_ = num_rows;
}
}
- return true;
-}
-
-bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
- if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
- return false;
- }
+ SimpleTable(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<Array>>& columns, int64_t
num_rows = -1) {
+ schema_ = schema;
+ if (num_rows < 0) {
+ if (columns.size() == 0) {
+ num_rows_ = 0;
+ } else {
+ num_rows_ = columns[0]->length();
+ }
+ } else {
+ num_rows_ = num_rows;
+ }
- for (int i = 0; i < num_columns(); ++i) {
- if (!column(i)->ApproxEquals(other.column(i))) {
- return false;
+ columns_.resize(columns.size());
+ for (size_t i = 0; i < columns.size(); ++i) {
+ columns_[i] =
+ std::make_shared<Column>(schema->field(static_cast<int>(i)),
columns[i]);
}
}
- return true;
-}
-
-std::shared_ptr<RecordBatch> RecordBatch::ReplaceSchemaMetadata(
- const std::shared_ptr<const KeyValueMetadata>& metadata) const {
- auto new_schema = schema_->AddMetadata(metadata);
- return std::make_shared<RecordBatch>(new_schema, num_rows_, columns_);
-}
+ std::shared_ptr<Column> column(int i) const override { return columns_[i]; }
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
- return Slice(offset, this->num_rows() - offset);
-}
+ Status RemoveColumn(int i, std::shared_ptr<Table>* out) const override {
+ std::shared_ptr<Schema> new_schema;
+ RETURN_NOT_OK(schema_->RemoveField(i, &new_schema));
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t
length) const {
- std::vector<std::shared_ptr<ArrayData>> arrays;
- arrays.reserve(num_columns());
- for (const auto& field : columns_) {
- int64_t col_length = std::min(field->length - offset, length);
- int64_t col_offset = field->offset + offset;
-
- auto new_data = std::make_shared<ArrayData>(*field);
- new_data->length = col_length;
- new_data->offset = col_offset;
- new_data->null_count = kUnknownNullCount;
- arrays.emplace_back(new_data);
+ *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i));
+ return Status::OK();
}
- int64_t num_rows = std::min(num_rows_ - offset, length);
- return std::make_shared<RecordBatch>(schema_, num_rows, std::move(arrays));
-}
-Status RecordBatch::Validate() const {
- for (int i = 0; i < num_columns(); ++i) {
- const ArrayData& arr = *columns_[i];
- if (arr.length != num_rows_) {
+ Status AddColumn(int i, const std::shared_ptr<Column>& col,
+ std::shared_ptr<Table>* out) const override {
+ if (i < 0 || i > num_columns() + 1) {
+ return Status::Invalid("Invalid column index.");
+ }
+ if (col == nullptr) {
std::stringstream ss;
- ss << "Number of rows in column " << i << " did not match batch: " <<
arr.length
- << " vs " << num_rows_;
+ ss << "Column " << i << " was null";
return Status::Invalid(ss.str());
}
- const auto& schema_type = *schema_->field(i)->type();
- if (!arr.type->Equals(schema_type)) {
+ if (col->length() != num_rows_) {
std::stringstream ss;
- ss << "Column " << i << " type not match schema: " <<
arr.type->ToString() << " vs "
- << schema_type.ToString();
+ ss << "Added column's length must match table's length. Expected length "
+ << num_rows_ << " but got length " << col->length();
return Status::Invalid(ss.str());
}
+
+ std::shared_ptr<Schema> new_schema;
+ RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema));
+
+ *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i,
col));
+ return Status::OK();
}
- return Status::OK();
-}
-// ----------------------------------------------------------------------
-// Table methods
+ std::shared_ptr<Table> ReplaceSchemaMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
+ auto new_schema = schema_->AddMetadata(metadata);
+ return Table::Make(new_schema, columns_);
+ }
-Table::Table(const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<Column>>& columns, int64_t
num_rows)
- : schema_(schema), columns_(columns) {
- if (num_rows < 0) {
- if (columns.size() == 0) {
- num_rows_ = 0;
- } else {
- num_rows_ = columns[0]->length();
+ Status Validate() const override {
+ if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
+ return Status::Invalid("Number of columns did not match schema");
}
- } else {
- num_rows_ = num_rows;
- }
-}
-Table::Table(const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<Array>>& columns, int64_t
num_rows)
- : schema_(schema) {
- if (num_rows < 0) {
- if (columns.size() == 0) {
- num_rows_ = 0;
- } else {
- num_rows_ = columns[0]->length();
+ // Make sure columns are all the same length
+ for (int i = 0; i < num_columns(); ++i) {
+ const Column* col = columns_[i].get();
+ if (col == nullptr) {
+ std::stringstream ss;
+ ss << "Column " << i << " was null";
+ return Status::Invalid(ss.str());
+ }
+ if (col->length() != num_rows_) {
+ std::stringstream ss;
+ ss << "Column " << i << " named " << col->name() << " expected length "
+ << num_rows_ << " but got length " << col->length();
+ return Status::Invalid(ss.str());
+ }
}
- } else {
- num_rows_ = num_rows;
+ return Status::OK();
}
- columns_.resize(columns.size());
- for (size_t i = 0; i < columns.size(); ++i) {
- columns_[i] =
- std::make_shared<Column>(schema->field(static_cast<int>(i)),
columns[i]);
- }
+ private:
+ std::vector<std::shared_ptr<Column>> columns_;
+};
+
+Table::Table() {}
+
+std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<Column>>&
columns,
+ int64_t num_rows) {
+ return std::make_shared<SimpleTable>(schema, columns, num_rows);
}
-std::shared_ptr<Table> Table::ReplaceSchemaMetadata(
- const std::shared_ptr<const KeyValueMetadata>& metadata) const {
- auto new_schema = schema_->AddMetadata(metadata);
- return std::make_shared<Table>(new_schema, columns_);
+std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
+ const std::vector<std::shared_ptr<Array>>&
arrays,
+ int64_t num_rows) {
+ return std::make_shared<SimpleTable>(schema, arrays, num_rows);
}
Status Table::FromRecordBatches(const
std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -351,7 +307,7 @@ Status Table::FromRecordBatches(const
std::vector<std::shared_ptr<RecordBatch>>&
columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
}
- *table = std::make_shared<Table>(schema, columns);
+ *table = Table::Make(schema, columns);
return Status::OK();
}
@@ -388,7 +344,7 @@ Status ConcatenateTables(const
std::vector<std::shared_ptr<Table>>& tables,
}
columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
}
- *table = std::make_shared<Table>(schema, columns);
+ *table = Table::Make(schema, columns);
return Status::OK();
}
@@ -399,82 +355,19 @@ bool Table::Equals(const Table& other) const {
if (!schema_->Equals(*other.schema())) {
return false;
}
- if (static_cast<int64_t>(columns_.size()) != other.num_columns()) {
+ if (this->num_columns() != other.num_columns()) {
return false;
}
- for (int i = 0; i < static_cast<int>(columns_.size()); i++) {
- if (!columns_[i]->Equals(other.column(i))) {
+ for (int i = 0; i < this->num_columns(); i++) {
+ if (!this->column(i)->Equals(other.column(i))) {
return false;
}
}
return true;
}
-Status Table::RemoveColumn(int i, std::shared_ptr<Table>* out) const {
- std::shared_ptr<Schema> new_schema;
- RETURN_NOT_OK(schema_->RemoveField(i, &new_schema));
-
- *out = std::make_shared<Table>(new_schema,
internal::DeleteVectorElement(columns_, i));
- return Status::OK();
-}
-
-Status Table::AddColumn(int i, const std::shared_ptr<Column>& col,
- std::shared_ptr<Table>* out) const {
- if (i < 0 || i > num_columns() + 1) {
- return Status::Invalid("Invalid column index.");
- }
- if (col == nullptr) {
- std::stringstream ss;
- ss << "Column " << i << " was null";
- return Status::Invalid(ss.str());
- }
- if (col->length() != num_rows_) {
- std::stringstream ss;
- ss << "Added column's length must match table's length. Expected length "
<< num_rows_
- << " but got length " << col->length();
- return Status::Invalid(ss.str());
- }
-
- std::shared_ptr<Schema> new_schema;
- RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema));
-
- *out =
- std::make_shared<Table>(new_schema, internal::AddVectorElement(columns_,
i, col));
- return Status::OK();
-}
-
-Status Table::ValidateColumns() const {
- if (num_columns() != schema_->num_fields()) {
- return Status::Invalid("Number of columns did not match schema");
- }
-
- // Make sure columns are all the same length
- for (size_t i = 0; i < columns_.size(); ++i) {
- const Column* col = columns_[i].get();
- if (col == nullptr) {
- std::stringstream ss;
- ss << "Column " << i << " was null";
- return Status::Invalid(ss.str());
- }
- if (col->length() != num_rows_) {
- std::stringstream ss;
- ss << "Column " << i << " named " << col->name() << " expected length "
<< num_rows_
- << " but got length " << col->length();
- return Status::Invalid(ss.str());
- }
- }
- return Status::OK();
-}
-
-bool Table::IsChunked() const {
- for (size_t i = 0; i < columns_.size(); ++i) {
- if (columns_[i]->data()->num_chunks() > 1) {
- return true;
- }
- }
- return false;
-}
+#ifndef ARROW_NO_DEPRECATED_API
Status MakeTable(const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Array>>& arrays,
@@ -493,15 +386,12 @@ Status MakeTable(const std::shared_ptr<Schema>& schema,
columns.emplace_back(std::make_shared<Column>(schema->field(i),
arrays[i]));
}
- *table = std::make_shared<Table>(schema, columns);
+ *table = Table::Make(schema, columns);
return Status::OK();
}
-// ----------------------------------------------------------------------
-// Base record batch reader
-
-RecordBatchReader::~RecordBatchReader() {}
+#endif // ARROW_NO_DEPRECATED_API
// ----------------------------------------------------------------------
// Convert a table to a sequence of record batches
@@ -565,8 +455,7 @@ class TableBatchReader::TableBatchReaderImpl {
}
absolute_row_position_ += chunksize;
- *out =
- std::make_shared<RecordBatch>(table_.schema(), chunksize,
std::move(batch_data));
+ *out = RecordBatch::Make(table_.schema(), chunksize,
std::move(batch_data));
return Status::OK();
}
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 2cff32f74..d0312d93c 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -24,6 +24,7 @@
#include <vector>
#include "arrow/array.h"
+#include "arrow/record_batch.h"
#include "arrow/type.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -33,8 +34,6 @@ namespace arrow {
class KeyValueMetadata;
class Status;
-using ArrayVector = std::vector<std::shared_ptr<Array>>;
-
/// \class ChunkedArray
/// \brief A data structure managing a list of primitive Arrow arrays logically
/// as one large array
@@ -113,123 +112,28 @@ class ARROW_EXPORT Column {
ARROW_DISALLOW_COPY_AND_ASSIGN(Column);
};
-/// \class RecordBatch
-/// \brief Collection of equal-length arrays matching a particular Schema
-///
-/// A record batch is table-like data structure consisting of an internal
-/// sequence of fields, each a contiguous Arrow array
-class ARROW_EXPORT RecordBatch {
- public:
- /// \param[in] schema The record batch schema
- /// \param[in] num_rows length of fields in the record batch. Each array
- /// should have the same length as num_rows
- /// \param[in] columns the record batch fields as vector of arrays
- RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
- const std::vector<std::shared_ptr<Array>>& columns);
-
- /// \brief Move-based constructor for a vector of Array instances
- RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
- std::vector<std::shared_ptr<Array>>&& columns);
-
- /// \brief Construct record batch from vector of internal data structures
- /// \since 0.5.0
- ///
- /// This class is only provided with an rvalue-reference for the input data,
- /// and is intended for internal use, or advanced users.
- ///
- /// \param schema the record batch schema
- /// \param num_rows the number of semantic rows in the record batch. This
- /// should be equal to the length of each field
- /// \param columns the data for the batch's columns
- RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
- std::vector<std::shared_ptr<ArrayData>>&& columns);
-
- /// \brief Construct record batch by copying vector of array data
- /// \since 0.5.0
- RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
- const std::vector<std::shared_ptr<ArrayData>>& columns);
-
- /// \brief Determine if two record batches are exactly equal
- /// \return true if batches are equal
- bool Equals(const RecordBatch& other) const;
-
- /// \brief Determine if two record batches are approximately equal
- bool ApproxEquals(const RecordBatch& other) const;
-
- // \return the table's schema
- /// \return true if batches are equal
- std::shared_ptr<Schema> schema() const { return schema_; }
-
- /// \brief Retrieve an array from the record batch
- /// \param[in] i field index, does not boundscheck
- /// \return an Array object
- std::shared_ptr<Array> column(int i) const;
-
- std::shared_ptr<ArrayData> column_data(int i) const { return columns_[i]; }
-
- /// \brief Name in i-th column
- const std::string& column_name(int i) const;
-
- /// \return the number of columns in the table
- int num_columns() const { return static_cast<int>(columns_.size()); }
-
- /// \return the number of rows (the corresponding length of each column)
- int64_t num_rows() const { return num_rows_; }
-
- /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
- /// \since 0.5.0
- ///
- /// \param[in] metadata new KeyValueMetadata
- /// \return new RecordBatch
- std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
- const std::shared_ptr<const KeyValueMetadata>& metadata) const;
-
- /// \brief Slice each of the arrays in the record batch
- /// \param[in] offset the starting offset to slice, through end of batch
- /// \return new record batch
- std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
-
- /// \brief Slice each of the arrays in the record batch
- /// \param[in] offset the starting offset to slice
- /// \param[in] length the number of elements to slice from offset
- /// \return new record batch
- std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const;
-
- /// \brief Check for schema or length inconsistencies
- /// \return Status
- Status Validate() const;
-
- private:
- ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
-
- RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
-
- std::shared_ptr<Schema> schema_;
- int64_t num_rows_;
- std::vector<std::shared_ptr<ArrayData>> columns_;
-
- // Caching boxed array data
- mutable std::vector<std::shared_ptr<Array>> boxed_columns_;
-};
-
/// \class Table
/// \brief Logical table as sequence of chunked arrays
class ARROW_EXPORT Table {
public:
+ virtual ~Table() = default;
+
/// \brief Construct Table from schema and columns
/// If columns is zero-length, the table's number of rows is zero
/// \param schema The table schema (column types)
/// \param columns The table's columns
/// \param num_rows number of rows in table, -1 (default) to infer from
columns
- Table(const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows
= -1);
+ static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema,
+ const
std::vector<std::shared_ptr<Column>>& columns,
+ int64_t num_rows = -1);
/// \brief Construct Table from schema and arrays
/// \param schema The table schema (column types)
/// \param arrays The table's columns as arrays
/// \param num_rows number of rows in table, -1 (default) to infer from
columns
- Table(const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<Array>>& arrays, int64_t num_rows =
-1);
+ static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema,
+ const
std::vector<std::shared_ptr<Array>>& arrays,
+ int64_t num_rows = -1);
// Construct table from RecordBatch, but only if all of the batch schemas are
// equal. Returns Status::Invalid if there is some problem
@@ -242,25 +146,28 @@ class ARROW_EXPORT Table {
/// \param[in] i column index, does not boundscheck
/// \return the i-th column
- std::shared_ptr<Column> column(int i) const { return columns_[i]; }
+ virtual std::shared_ptr<Column> column(int i) const = 0;
/// \brief Remove column from the table, producing a new Table
- Status RemoveColumn(int i, std::shared_ptr<Table>* out) const;
+ virtual Status RemoveColumn(int i, std::shared_ptr<Table>* out) const = 0;
/// \brief Add column to the table, producing a new Table
- Status AddColumn(int i, const std::shared_ptr<Column>& column,
- std::shared_ptr<Table>* out) const;
+ virtual Status AddColumn(int i, const std::shared_ptr<Column>& column,
+ std::shared_ptr<Table>* out) const = 0;
/// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
/// \since 0.5.0
///
/// \param[in] metadata new KeyValueMetadata
/// \return new Table
- std::shared_ptr<Table> ReplaceSchemaMetadata(
- const std::shared_ptr<const KeyValueMetadata>& metadata) const;
+ virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
+
+ /// \brief Perform any checks to validate the input arguments
+ virtual Status Validate() const = 0;
/// \return the number of columns in the table
- int num_columns() const { return static_cast<int>(columns_.size()); }
+ int num_columns() const { return schema_->num_fields(); }
/// \return the number of rows (the corresponding length of each column)
int64_t num_rows() const { return num_rows_; }
@@ -268,35 +175,14 @@ class ARROW_EXPORT Table {
/// \brief Determine if semantic contents of tables are exactly equal
bool Equals(const Table& other) const;
- /// \brief Perform any checks to validate the input arguments
- Status ValidateColumns() const;
-
- /// \brief Return true if any column has multiple chunks
- bool IsChunked() const;
-
- private:
- ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
+ protected:
+ Table();
std::shared_ptr<Schema> schema_;
- std::vector<std::shared_ptr<Column>> columns_;
-
int64_t num_rows_;
-};
-
-/// \brief Abstract interface for reading stream of record batches
-class ARROW_EXPORT RecordBatchReader {
- public:
- virtual ~RecordBatchReader();
- /// \return the shared schema of the record batches in the stream
- virtual std::shared_ptr<Schema> schema() const = 0;
-
- /// Read the next record batch in the stream. Return null for batch when
- /// reaching end of stream
- ///
- /// \param[out] batch the next loaded batch, null at end of stream
- /// \return Status
- virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
};
/// \brief Compute a sequence of record batches from a (possibly chunked) Table
@@ -322,13 +208,18 @@ ARROW_EXPORT
Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables,
std::shared_ptr<Table>* table);
+#ifndef ARROW_NO_DEPRECATED_API
+
/// \brief Construct table from multiple input tables.
/// \return Status, fails if any schemas are different
+/// \note Deprecated since 0.8.0
ARROW_EXPORT
Status MakeTable(const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Array>>& arrays,
std::shared_ptr<Table>* table);
+#endif
+
} // namespace arrow
#endif // ARROW_TABLE_H
diff --git a/cpp/src/arrow/table_builder-test.cc
b/cpp/src/arrow/table_builder-test.cc
index 07d9b6b2d..8167577e9 100644
--- a/cpp/src/arrow/table_builder-test.cc
+++ b/cpp/src/arrow/table_builder-test.cc
@@ -22,6 +22,7 @@
#include "gtest/gtest.h"
#include "arrow/array.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
@@ -98,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) {
ASSERT_OK(ex_b1.Finish(&a1));
ASSERT_OK(ex_b2.Finish(&a2));
- RecordBatch expected(schema, 4, {a0, a1, a2});
+ auto expected = RecordBatch::Make(schema, 4, {a0, a1, a2});
// Builder attributes
ASSERT_EQ(3, builder->num_fields());
@@ -119,7 +120,7 @@ TEST_F(TestRecordBatchBuilder, Basics) {
ASSERT_OK(builder->Flush(&batch));
}
- ASSERT_BATCHES_EQUAL(expected, *batch);
+ ASSERT_BATCHES_EQUAL(*expected, *batch);
}
// Test setting initial capacity
diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc
index a1bd95940..379d886de 100644
--- a/cpp/src/arrow/table_builder.cc
+++ b/cpp/src/arrow/table_builder.cc
@@ -24,6 +24,7 @@
#include "arrow/array.h"
#include "arrow/builder.h"
+#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
@@ -64,7 +65,7 @@ Status RecordBatchBuilder::Flush(bool reset_builders,
}
length = fields[i]->length();
}
- *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields));
+ *batch = RecordBatch::Make(schema_, length, std::move(fields));
if (reset_builders) {
return InitBuilders();
} else {
diff --git a/cpp/src/arrow/test-common.h b/cpp/src/arrow/test-common.h
index a4c4fddff..911adf7b6 100644
--- a/cpp/src/arrow/test-common.h
+++ b/cpp/src/arrow/test-common.h
@@ -30,7 +30,6 @@
#include "arrow/buffer.h"
#include "arrow/builder.h"
#include "arrow/memory_pool.h"
-#include "arrow/table.h"
#include "arrow/test-util.h"
namespace arrow {
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 77f489ab1..1a3480848 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -35,7 +35,6 @@
#include "arrow/memory_pool.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
-#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"
@@ -375,7 +374,7 @@ void AssertArraysEqual(const Array& expected, const Array&
actual) {
#define ASSERT_BATCHES_EQUAL(LEFT, RIGHT) \
do { \
- if (!LEFT.ApproxEquals(RIGHT)) { \
+ if (!(LEFT).ApproxEquals(RIGHT)) { \
std::stringstream ss; \
ss << "Left:\n"; \
ASSERT_OK(PrettyPrint(LEFT, 0, &ss)); \
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 70f275c0f..8dcc1592d 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -498,9 +498,9 @@ class ARROW_EXPORT StructType : public NestedType {
std::vector<BufferDescr> GetBufferLayout() const override;
};
-class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType {
+class ARROW_EXPORT DecimalType : public FixedSizeBinaryType {
public:
- explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t
scale)
+ explicit DecimalType(int32_t byte_width, int32_t precision, int32_t scale)
: FixedSizeBinaryType(byte_width, Type::DECIMAL),
precision_(precision),
scale_(scale) {}
@@ -513,21 +513,18 @@ class ARROW_EXPORT DecimalBaseType : public
FixedSizeBinaryType {
int32_t scale_;
};
-class ARROW_EXPORT Decimal128Type : public DecimalBaseType {
+class ARROW_EXPORT Decimal128Type : public DecimalType {
public:
static constexpr Type::type type_id = Type::DECIMAL;
explicit Decimal128Type(int32_t precision, int32_t scale)
- : DecimalBaseType(16, precision, scale) {}
+ : DecimalType(16, precision, scale) {}
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
std::string name() const override { return "decimal"; }
};
-// TODO(wesm): Remove this
-using DecimalType = Decimal128Type;
-
struct UnionMode {
enum type { SPARSE, DENSE };
};
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index dbfd89cc3..73e34c7b2 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -403,8 +403,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CChunkedArray] data()
cdef cppclass CRecordBatch" arrow::RecordBatch":
- CRecordBatch(const shared_ptr[CSchema]& schema, int64_t num_rows,
- const vector[shared_ptr[CArray]]& columns)
+ @staticmethod
+ shared_ptr[CRecordBatch] Make(
+ const shared_ptr[CSchema]& schema, int64_t num_rows,
+ const vector[shared_ptr[CArray]]& columns)
c_bool Equals(const CRecordBatch& other)
@@ -427,6 +429,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CTable(const shared_ptr[CSchema]& schema,
const vector[shared_ptr[CColumn]]& columns)
+ @staticmethod
+ shared_ptr[CTable] Make(
+ const shared_ptr[CSchema]& schema,
+ const vector[shared_ptr[CColumn]]& columns)
+
@staticmethod
CStatus FromRecordBatches(
const vector[shared_ptr[CRecordBatch]]& batches,
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 591f32975..8c5b8bbc3 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -724,7 +724,6 @@ cdef class RecordBatch:
Array arr
c_string c_name
shared_ptr[CSchema] schema
- shared_ptr[CRecordBatch] batch
vector[shared_ptr[CArray]] c_arrays
int64_t num_rows
int64_t i
@@ -740,8 +739,8 @@ cdef class RecordBatch:
for arr in arrays:
c_arrays.push_back(arr.sp_array)
- batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
- return pyarrow_wrap_batch(batch)
+ return pyarrow_wrap_batch(
+ CRecordBatch.Make(schema, num_rows, c_arrays))
def table_to_blocks(PandasOptions options, Table table, int nthreads,
@@ -946,8 +945,7 @@ cdef class Table:
else:
raise ValueError(type(arrays[i]))
- table.reset(new CTable(c_schema, columns))
- return pyarrow_wrap_table(table)
+ return pyarrow_wrap_table(CTable.Make(c_schema, columns))
@staticmethod
def from_batches(batches):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [C++] Make RecordBatch interface virtual to permit record batches that
> lazy-materialize columns
> -----------------------------------------------------------------------------------------------
>
> Key: ARROW-1808
> URL: https://issues.apache.org/jira/browse/ARROW-1808
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++
> Reporter: Wes McKinney
> Assignee: Wes McKinney
> Labels: pull-request-available
> Fix For: 0.8.0
>
>
> This should be looked at soon to prevent having to define a different virtual
> interface for record batches. There are places where we are using the record
> batch constructor directly, and in some third party code (like MapD), so this
> might be good to get done for 0.8.0
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)