This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new fc4e2c3  ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces 
for column access
fc4e2c3 is described below

commit fc4e2c36d2c56a8bd5d1ab17eeb406826924d3e5
Author: Wes McKinney <wes.mckin...@twosigma.com>
AuthorDate: Tue Nov 21 19:01:48 2017 -0500

    ARROW-1808: [C++] Make RecordBatch, Table virtual interfaces for column 
access
    
    While this will cause some minor API breakage in parquet-cpp and some other 
downstream users, this is reasonably long overdue. It will permit 
implementations of the RecordBatch or Table interface that do lazy IO / data 
loading or lazy materialization of columns.
    
    I will write a patch to fix up parquet-cpp, and will look to see if glib is 
easy to fix. There's no good way to go about merging this patch since a green 
build is not possible, so once we're happy with the patch, I can merge this 
patch and then work on getting a green build in parquet-cpp so we don't have a 
broken build there for too long
    
    Author: Wes McKinney <wes.mckin...@twosigma.com>
    Author: Kouhei Sutou <k...@clear-code.com>
    
    Closes #1337 from wesm/ARROW-1808 and squashes the following commits:
    
    55ce663d [Wes McKinney] clang-format
    57923708 [Wes McKinney] Revert to debug build for now
    06625301 [Wes McKinney] Fix test case that wasn't being run in debug builds
    c9a7cb83 [Kouhei Sutou] Apply patch to fix Glib test suite
    6b02e438 [Wes McKinney] Move DCHECK in SimpleRecordBatch ctor to Validate
    06bced1a [Wes McKinney] Deprecate arrow::MakeTable
    498bb082 [Wes McKinney] Fix glib compilation
    a4e403c3 [Wes McKinney] Set build warning level via environment variable
    9a61beb5 [Wes McKinney] Move boxed_columns_ to SimpleTable. Remove 
DecimalType backwards compat define
    7691a5f1 [Wes McKinney] Make Table virtual also, refactor
    bcc0cd15 [Wes McKinney] Some table refactoring to be virtual
    267dd218 [Wes McKinney] Finish RecordBatch refactoring, get tests passing 
again. Add option to set build type in Travis CI
    ef00e5b9 [Wes McKinney] Split off record batch implementation into separate 
files, progress refactoring
    33f341df [Wes McKinney] Start making things virtual
---
 .travis.yml                                   |   2 +
 c_glib/arrow-glib/record-batch.cpp            |   5 +-
 c_glib/arrow-glib/table.cpp                   |   3 +-
 c_glib/test/test-file-writer.rb               |  16 +-
 c_glib/test/test-gio-input-stream.rb          |  18 +-
 c_glib/test/test-gio-output-stream.rb         |  18 +-
 c_glib/test/test-stream-writer.rb             |  18 +-
 ci/travis_before_script_cpp.sh                |   6 +-
 ci/travis_env_common.sh                       |   3 +
 ci/travis_script_python.sh                    |   3 +
 cpp/src/arrow/CMakeLists.txt                  |   2 +
 cpp/src/arrow/api.h                           |   1 +
 cpp/src/arrow/array.h                         |   2 +
 cpp/src/arrow/builder.cc                      |   1 -
 cpp/src/arrow/builder.h                       |   1 -
 cpp/src/arrow/column-benchmark.cc             |   1 +
 cpp/src/arrow/compute/kernel.h                |   1 +
 cpp/src/arrow/gpu/cuda_arrow_ipc.cc           |   2 +-
 cpp/src/arrow/ipc/feather-test.cc             |   5 +-
 cpp/src/arrow/ipc/feather.cc                  |   1 +
 cpp/src/arrow/ipc/ipc-json-test.cc            |   4 +-
 cpp/src/arrow/ipc/ipc-read-write-benchmark.cc |   2 +-
 cpp/src/arrow/ipc/ipc-read-write-test.cc      |  19 +-
 cpp/src/arrow/ipc/json-integration-test.cc    |   2 +-
 cpp/src/arrow/ipc/json-internal.cc            |   8 +-
 cpp/src/arrow/ipc/json.cc                     |   2 +-
 cpp/src/arrow/ipc/reader.cc                   |   4 +-
 cpp/src/arrow/ipc/reader.h                    |   3 +-
 cpp/src/arrow/ipc/test-common.h               |  44 ++--
 cpp/src/arrow/ipc/writer.cc                   |  10 +-
 cpp/src/arrow/pretty_print.cc                 |   2 +-
 cpp/src/arrow/python/python-test.cc           |   5 +-
 cpp/src/arrow/python/python_to_arrow.cc       |   8 +-
 cpp/src/arrow/record_batch.cc                 | 206 +++++++++++++++++
 cpp/src/arrow/record_batch.h                  | 154 +++++++++++++
 cpp/src/arrow/table-test.cc                   | 177 +++++++--------
 cpp/src/arrow/table.cc                        | 315 +++++++++-----------------
 cpp/src/arrow/table.h                         | 165 +++-----------
 cpp/src/arrow/table_builder-test.cc           |   5 +-
 cpp/src/arrow/table_builder.cc                |   3 +-
 cpp/src/arrow/test-common.h                   |   1 -
 cpp/src/arrow/test-util.h                     |   3 +-
 cpp/src/arrow/type.h                          |  11 +-
 python/pyarrow/includes/libarrow.pxd          |  11 +-
 python/pyarrow/table.pxi                      |   8 +-
 45 files changed, 725 insertions(+), 556 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 9c714a6..ddadf73 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -55,6 +55,7 @@ matrix:
     - export ARROW_TRAVIS_VALGRIND=1
     - export ARROW_TRAVIS_PLASMA=1
     - export ARROW_TRAVIS_CLANG_FORMAT=1
+    - export ARROW_BUILD_WARNING_LEVEL=CHECKIN
     - export CC="clang-4.0"
     - export CXX="clang++-4.0"
     - $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh
@@ -74,6 +75,7 @@ matrix:
     before_script:
     - export ARROW_TRAVIS_USE_TOOLCHAIN=1
     - export ARROW_TRAVIS_PLASMA=1
+    - export ARROW_BUILD_WARNING_LEVEL=CHECKIN
     - travis_wait 50 $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
     script:
     - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
diff --git a/c_glib/arrow-glib/record-batch.cpp 
b/c_glib/arrow-glib/record-batch.cpp
index f381af0..f23a0cf 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -150,9 +150,8 @@ garrow_record_batch_new(GArrowSchema *schema,
   }
 
   auto arrow_record_batch =
-    std::make_shared<arrow::RecordBatch>(garrow_schema_get_raw(schema),
-                                         n_rows,
-                                         arrow_columns);
+    arrow::RecordBatch::Make(garrow_schema_get_raw(schema),
+                             n_rows, arrow_columns);
   return garrow_record_batch_new_raw(&arrow_record_batch);
 }
 
diff --git a/c_glib/arrow-glib/table.cpp b/c_glib/arrow-glib/table.cpp
index 779f2ef..e086396 100644
--- a/c_glib/arrow-glib/table.cpp
+++ b/c_glib/arrow-glib/table.cpp
@@ -143,8 +143,7 @@ garrow_table_new(GArrowSchema *schema,
   }
 
   auto arrow_table =
-    std::make_shared<arrow::Table>(garrow_schema_get_raw(schema),
-                                   arrow_columns);
+    arrow::Table::Make(garrow_schema_get_raw(schema), arrow_columns);
   return garrow_table_new_raw(&arrow_table);
 }
 
diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb
index 3de8e5c..67aed85 100644
--- a/c_glib/test/test-file-writer.rb
+++ b/c_glib/test/test-file-writer.rb
@@ -19,14 +19,18 @@ class TestFileWriter < Test::Unit::TestCase
   include Helper::Buildable
 
   def test_write_record_batch
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-ipc-file-writer")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +42,12 @@ class TestFileWriter < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-gio-input-stream.rb 
b/c_glib/test/test-gio-input-stream.rb
index a71a370..2adf25b 100644
--- a/c_glib/test/test-gio-input-stream.rb
+++ b/c_glib/test/test-gio-input-stream.rb
@@ -16,15 +16,21 @@
 # under the License.
 
 class TestGIOInputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_reader_backend
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-gio-input-stream")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +44,12 @@ class TestGIOInputStream < Test::Unit::TestCase
     input = Arrow::GIOInputStream.new(input_stream)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-gio-output-stream.rb 
b/c_glib/test/test-gio-output-stream.rb
index adaa8c1..c77598e 100644
--- a/c_glib/test/test-gio-output-stream.rb
+++ b/c_glib/test/test-gio-output-stream.rb
@@ -16,17 +16,23 @@
 # under the License.
 
 class TestGIOOutputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_writer_backend
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-gio-output-stream")
     file = Gio::File.new_for_path(tempfile.path)
     output_stream = file.append_to(:none)
     output = Arrow::GIOOutputStream.new(output_stream)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       file_writer = Arrow::RecordBatchFileWriter.new(output, schema)
       begin
-        record_batch = Arrow::RecordBatch.new(schema, 0, [])
+        record_batch = Arrow::RecordBatch.new(schema,
+                                              data.size,
+                                              [build_boolean_array(data)])
         file_writer.write_record_batch(record_batch)
       ensure
         file_writer.close
@@ -38,8 +44,12 @@ class TestGIOOutputStream < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       file_reader = Arrow::RecordBatchFileReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    file_reader.schema.fields.collect(&:name))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   file_reader.read_record_batch(0))
     ensure
       input.close
     end
diff --git a/c_glib/test/test-stream-writer.rb 
b/c_glib/test/test-stream-writer.rb
index c3d0e14..32754e2 100644
--- a/c_glib/test/test-stream-writer.rb
+++ b/c_glib/test/test-stream-writer.rb
@@ -19,17 +19,19 @@ class TestStreamWriter < Test::Unit::TestCase
   include Helper::Buildable
 
   def test_write_record_batch
+    data = [true]
+    field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
+    schema = Arrow::Schema.new([field])
+
     tempfile = Tempfile.open("arrow-ipc-stream-writer")
     output = Arrow::FileOutputStream.new(tempfile.path, false)
     begin
-      field = Arrow::Field.new("enabled", Arrow::BooleanDataType.new)
-      schema = Arrow::Schema.new([field])
       stream_writer = Arrow::RecordBatchStreamWriter.new(output, schema)
       begin
         columns = [
-          build_boolean_array([true]),
+          build_boolean_array(data),
         ]
-        record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+        record_batch = Arrow::RecordBatch.new(schema, data.size, columns)
         stream_writer.write_record_batch(record_batch)
       ensure
         stream_writer.close
@@ -41,10 +43,12 @@ class TestStreamWriter < Test::Unit::TestCase
     input = Arrow::MemoryMappedInputStream.new(tempfile.path)
     begin
       stream_reader = Arrow::RecordBatchStreamReader.new(input)
-      assert_equal(["enabled"],
+      assert_equal([field.name],
                    stream_reader.schema.fields.collect(&:name))
-      assert_equal(true,
-                   stream_reader.read_next.get_column(0).get_value(0))
+      assert_equal(Arrow::RecordBatch.new(schema,
+                                          data.size,
+                                          [build_boolean_array(data)]),
+                   stream_reader.read_next)
       assert_nil(stream_reader.read_next)
     ensure
       input.close
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 4998f19..664f7ce 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -91,12 +91,14 @@ fi
 if [ $TRAVIS_OS_NAME == "linux" ]; then
     cmake $CMAKE_COMMON_FLAGS \
           $CMAKE_LINUX_FLAGS \
-          -DBUILD_WARNING_LEVEL=CHECKIN \
+          -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
+          -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \
           $ARROW_CPP_DIR
 else
     cmake $CMAKE_COMMON_FLAGS \
           $CMAKE_OSX_FLAGS \
-          -DBUILD_WARNING_LEVEL=CHECKIN \
+          -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
+          -DBUILD_WARNING_LEVEL=$ARROW_BUILD_WARNING_LEVEL \
           $ARROW_CPP_DIR
 fi
 
diff --git a/ci/travis_env_common.sh b/ci/travis_env_common.sh
index 52c7da4..21b6e26 100755
--- a/ci/travis_env_common.sh
+++ b/ci/travis_env_common.sh
@@ -38,6 +38,9 @@ export ARROW_PYTHON_PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env
 
 export CMAKE_EXPORT_COMPILE_COMMANDS=1
 
+export ARROW_BUILD_TYPE=${ARROW_BUILD_TYPE:=debug}
+export ARROW_BUILD_WARNING_LEVEL=${ARROW_BUILD_WARNING_LEVEL:=Production}
+
 if [ "$ARROW_TRAVIS_USE_TOOLCHAIN" == "1" ]; then
   # C++ toolchain
   export CPP_TOOLCHAIN=$TRAVIS_BUILD_DIR/cpp-toolchain
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 603201b..5f7b0a9 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -63,6 +63,7 @@ cmake -GNinja \
       -DARROW_BUILD_UTILITIES=off \
       -DARROW_PLASMA=on \
       -DARROW_PYTHON=on \
+      -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
       -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
       $ARROW_CPP_DIR
 
@@ -78,6 +79,8 @@ if [ "$PYTHON_VERSION" == "2.7" ]; then
   pip install futures
 fi
 
+export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE
+
 pip install -r requirements.txt
 python setup.py build_ext --with-parquet --with-plasma \
        install --single-version-externally-managed --record=record.text
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 496e0da..9470578 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -22,6 +22,7 @@ set(ARROW_SRCS
   compare.cc
   memory_pool.cc
   pretty_print.cc
+  record_batch.cc
   status.cc
   table.cc
   table_builder.cc
@@ -144,6 +145,7 @@ install(FILES
   compare.h
   memory_pool.h
   pretty_print.h
+  record_batch.h
   status.h
   table.h
   table_builder.h
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index 5d2e859..7cae841 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -26,6 +26,7 @@
 #include "arrow/compare.h"
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/table_builder.h"
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 28756a6..dda9dd3 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -279,6 +279,8 @@ class ARROW_EXPORT Array {
   ARROW_DISALLOW_COPY_AND_ASSIGN(Array);
 };
 
+using ArrayVector = std::vector<std::shared_ptr<Array>>;
+
 static inline std::ostream& operator<<(std::ostream& os, const Array& x) {
   os << x.ToString();
   return os;
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 3e213fc..a42f902 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -28,7 +28,6 @@
 #include "arrow/buffer.h"
 #include "arrow/compare.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 32741b5..e59e166 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -29,7 +29,6 @@
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
diff --git a/cpp/src/arrow/column-benchmark.cc 
b/cpp/src/arrow/column-benchmark.cc
index e50ddf6..af2c368 100644
--- a/cpp/src/arrow/column-benchmark.cc
+++ b/cpp/src/arrow/column-benchmark.cc
@@ -19,6 +19,7 @@
 
 #include "arrow/array.h"
 #include "arrow/memory_pool.h"
+#include "arrow/table.h"
 #include "arrow/test-util.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 7ff506c..0bfa55c 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/record_batch.h"
 #include "arrow/table.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/variant.h"
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc 
b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 022268e..a7262c8 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -27,8 +27,8 @@
 #include "arrow/ipc/message.h"
 #include "arrow/ipc/reader.h"
 #include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/util/visibility.h"
 
 #include "arrow/gpu/cuda_context.h"
diff --git a/cpp/src/arrow/ipc/feather-test.cc 
b/cpp/src/arrow/ipc/feather-test.cc
index 6bd1646..e3de17f 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -29,6 +29,7 @@
 #include "arrow/ipc/feather.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/pretty_print.h"
+#include "arrow/table.h"
 #include "arrow/test-util.h"
 
 namespace arrow {
@@ -376,8 +377,8 @@ TEST_F(TestTableWriter, TimeTypes) {
         schema->field(i)->type(), values->length(), buffers, 
values->null_count(), 0));
   }
 
-  RecordBatch batch(schema, values->length(), std::move(arrays));
-  CheckBatch(batch);
+  auto batch = RecordBatch::Make(schema, values->length(), std::move(arrays));
+  CheckBatch(*batch);
 }
 
 TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) {
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index cea720b..077dc39 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -32,6 +32,7 @@
 #include "arrow/ipc/feather-internal.h"
 #include "arrow/ipc/feather_generated.h"
 #include "arrow/ipc/util.h"  // IWYU pragma: keep
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc 
b/cpp/src/arrow/ipc/ipc-json-test.cc
index a560f09..e496826 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -31,8 +31,8 @@
 #include "arrow/ipc/json.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
@@ -269,7 +269,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
     std::vector<std::shared_ptr<Array>> arrays;
 
     MakeBatchArrays(schema, num_rows, &arrays);
-    auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+    auto batch = RecordBatch::Make(schema, num_rows, arrays);
     batches.push_back(batch);
     ASSERT_OK(writer->WriteRecordBatch(*batch));
   }
diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc 
b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
index 9ed0abd..8561fb8 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
@@ -63,7 +63,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t 
total_size, int64_t num_fie
   }
 
   auto schema = std::make_shared<Schema>(fields);
-  return std::make_shared<RecordBatch>(schema, length, arrays);
+  return RecordBatch::Make(schema, length, arrays);
 }
 
 static void BM_WriteRecordBatch(benchmark::State& state) {  // NOLINT 
non-const reference
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc 
b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 40cd3f0..1fcbdac 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -197,8 +197,8 @@ class IpcTestFixture : public io::MemoryMapFixture {
     std::vector<std::shared_ptr<Field>> fields = {f0};
     auto schema = std::make_shared<Schema>(fields);
 
-    RecordBatch batch(schema, 0, {array});
-    CheckRoundtrip(batch, buffer_size);
+    auto batch = RecordBatch::Make(schema, 0, {array});
+    CheckRoundtrip(*batch, buffer_size);
   }
 
  protected:
@@ -292,13 +292,13 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
   auto CheckArray = [this](const std::shared_ptr<Array>& array) {
     auto f0 = field("f0", array->type());
     auto schema = ::arrow::schema({f0});
-    RecordBatch batch(schema, array->length(), {array});
-    auto sliced_batch = batch.Slice(0, 5);
+    auto batch = RecordBatch::Make(schema, array->length(), {array});
+    auto sliced_batch = batch->Slice(0, 5);
 
     int64_t full_size;
     int64_t sliced_size;
 
-    ASSERT_OK(GetRecordBatchSize(batch, &full_size));
+    ASSERT_OK(GetRecordBatchSize(*batch, &full_size));
     ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size));
     ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size;
 
@@ -411,8 +411,7 @@ class RecursionLimits : public ::testing::Test, public 
io::MemoryMapFixture {
 
     *schema = ::arrow::schema({f0});
 
-    std::vector<std::shared_ptr<Array>> arrays = {array};
-    *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
+    *batch = RecordBatch::Make(*schema, batch_length, {array});
 
     std::stringstream ss;
     ss << "test-write-past-max-recursion-" << g_file_number++;
@@ -632,7 +631,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
   std::vector<std::shared_ptr<Field>> fields = {f0};
   auto schema = std::make_shared<Schema>(fields);
 
-  RecordBatch batch(schema, length, {array});
+  auto batch = RecordBatch::Make(schema, length, {array});
 
   std::string path = "test-write-large-record_batch";
 
@@ -641,8 +640,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
   ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_));
 
   std::shared_ptr<RecordBatch> result;
-  ASSERT_OK(DoLargeRoundTrip(batch, false, &result));
-  CheckReadResult(*result, batch);
+  ASSERT_OK(DoLargeRoundTrip(*batch, false, &result));
+  CheckReadResult(*result, *batch);
 
   ASSERT_EQ(length, result->num_rows());
 }
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc 
b/cpp/src/arrow/ipc/json-integration-test.cc
index c7530a4..f487487 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -34,8 +34,8 @@
 #include "arrow/ipc/reader.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
 
diff --git a/cpp/src/arrow/ipc/json-internal.cc 
b/cpp/src/arrow/ipc/json-internal.cc
index bdf1ef5..bfb3d28 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -28,8 +28,8 @@
 #include "arrow/array.h"
 #include "arrow/builder.h"
 #include "arrow/ipc/dictionary.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
@@ -125,8 +125,8 @@ class SchemaWriter {
 
     // Make a dummy record batch. A bit tedious as we have to make a schema
     auto schema = ::arrow::schema({arrow::field("dictionary", 
dictionary->type())});
-    RecordBatch batch(schema, dictionary->length(), {dictionary});
-    RETURN_NOT_OK(WriteRecordBatch(batch, writer_));
+    auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
+    RETURN_NOT_OK(WriteRecordBatch(*batch, writer_));
     writer_->EndObject();
     return Status::OK();
   }
@@ -1435,7 +1435,7 @@ Status ReadRecordBatch(const rj::Value& json_obj, const 
std::shared_ptr<Schema>&
     RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i]));
   }
 
-  *batch = std::make_shared<RecordBatch>(schema, num_rows, columns);
+  *batch = RecordBatch::Make(schema, num_rows, columns);
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index 30a1bb8..ea2947d 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -24,8 +24,8 @@
 #include "arrow/buffer.h"
 #include "arrow/ipc/json-internal.h"
 #include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/util/logging.h"
 
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8e10d7d..5960e81 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -37,8 +37,8 @@
 #include "arrow/ipc/message.h"
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/util.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/tensor.h"
 #include "arrow/type.h"
 #include "arrow/util/bit-util.h"
@@ -307,7 +307,7 @@ static Status LoadRecordBatchFromSource(const 
std::shared_ptr<Schema>& schema,
     arrays[i] = std::move(arr);
   }
 
-  *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays));
+  *out = RecordBatch::Make(schema, num_rows, std::move(arrays));
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 7581fbd..627f67e 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -24,13 +24,12 @@
 #include <memory>
 
 #include "arrow/ipc/message.h"
-#include "arrow/table.h"
+#include "arrow/record_batch.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
 class Buffer;
-class RecordBatch;
 class Schema;
 class Status;
 class Tensor;
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 7fc1393..6f8a0dc 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -30,8 +30,8 @@
 #include "arrow/builder.h"
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
 #include "arrow/util/bit-util.h"
@@ -184,7 +184,7 @@ Status MakeBooleanBatchSized(const int length, 
std::shared_ptr<RecordBatch>* out
   std::shared_ptr<Array> a0, a1;
   RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0));
   RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1));
-  out->reset(new RecordBatch(schema, length, {a0, a1}));
+  *out = RecordBatch::Make(schema, length, {a0, a1});
   return Status::OK();
 }
 
@@ -203,7 +203,7 @@ Status MakeIntBatchSized(int length, 
std::shared_ptr<RecordBatch>* out) {
   MemoryPool* pool = default_memory_pool();
   RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
   RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
-  out->reset(new RecordBatch(schema, length, {a0, a1}));
+  *out = RecordBatch::Make(schema, length, {a0, a1});
   return Status::OK();
 }
 
@@ -252,7 +252,7 @@ Status 
MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
     auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, true, pool, 
&a1);
     RETURN_NOT_OK(s);
   }
-  out->reset(new RecordBatch(schema, length, {a0, a1}));
+  *out = RecordBatch::Make(schema, length, {a0, a1});
   return Status::OK();
 }
 
@@ -261,7 +261,7 @@ Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* 
out) {
   auto f0 = field("f0", null());
   auto schema = ::arrow::schema({f0});
   std::shared_ptr<Array> a0 = std::make_shared<NullArray>(length);
-  out->reset(new RecordBatch(schema, length, {a0}));
+  *out = RecordBatch::Make(schema, length, {a0});
   return Status::OK();
 }
 
@@ -284,7 +284,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* 
out) {
   RETURN_NOT_OK(
       MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
   RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
-  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, 
flat_array}));
+  *out = RecordBatch::Make(schema, length, {list_array, list_list_array, 
flat_array});
   return Status::OK();
 }
 
@@ -304,7 +304,7 @@ Status 
MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
   RETURN_NOT_OK(
       MakeRandomListArray(list_array, 0, include_nulls, pool, 
&list_list_array));
   RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
-  out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, 
flat_array}));
+  *out = RecordBatch::Make(schema, 0, {list_array, list_list_array, 
flat_array});
   return Status::OK();
 }
 
@@ -327,7 +327,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* 
out) {
   RETURN_NOT_OK(
       MakeRandomListArray(list_array, length, include_nulls, pool, 
&list_list_array));
   RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, 
&flat_array));
-  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, 
flat_array}));
+  *out = RecordBatch::Make(schema, length, {list_array, list_list_array, 
flat_array});
   return Status::OK();
 }
 
@@ -347,7 +347,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* 
out) {
   auto f0 = field("f0", type);
   auto schema = ::arrow::schema({f0});
   std::vector<std::shared_ptr<Array>> arrays = {array};
-  out->reset(new RecordBatch(schema, batch_length, arrays));
+  *out = RecordBatch::Make(schema, batch_length, arrays);
   return Status::OK();
 }
 
@@ -377,7 +377,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
 
   // construct batch
   std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls};
-  out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays));
+  *out = RecordBatch::Make(schema, list_batch->num_rows(), arrays);
   return Status::OK();
 }
 
@@ -445,7 +445,7 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
 
   // construct batch
   std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, 
dense};
-  out->reset(new RecordBatch(schema, length, arrays));
+  *out = RecordBatch::Make(schema, length, arrays);
   return Status::OK();
 }
 
@@ -526,7 +526,7 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
 
   std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4};
 
-  out->reset(new RecordBatch(schema, length, arrays));
+  *out = RecordBatch::Make(schema, length, arrays);
   return Status::OK();
 }
 
@@ -564,7 +564,7 @@ Status MakeDictionaryFlat(std::shared_ptr<RecordBatch>* 
out) {
       {field("dict1", f0_type), field("sparse", f1_type), field("dense", 
f2_type)});
 
   std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2};
-  out->reset(new RecordBatch(schema, length, arrays));
+  *out = RecordBatch::Make(schema, length, arrays);
   return Status::OK();
 }
 
@@ -584,8 +584,7 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<Array> date64_array;
   ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array);
 
-  std::vector<std::shared_ptr<Array>> arrays = {date32_array, date64_array};
-  *out = std::make_shared<RecordBatch>(schema, date32_array->length(), arrays);
+  *out = RecordBatch::Make(schema, date32_array->length(), {date32_array, 
date64_array});
   return Status::OK();
 }
 
@@ -604,8 +603,7 @@ Status MakeTimestamps(std::shared_ptr<RecordBatch>* out) {
   ArrayFromVector<TimestampType, int64_t>(f1->type(), is_valid, ts_values, 
&a1);
   ArrayFromVector<TimestampType, int64_t>(f2->type(), is_valid, ts_values, 
&a2);
 
-  ArrayVector arrays = {a0, a1, a2};
-  *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays);
+  *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2});
   return Status::OK();
 }
 
@@ -628,8 +626,7 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) {
   ArrayFromVector<Time32Type, int32_t>(f2->type(), is_valid, t32_values, &a2);
   ArrayFromVector<Time64Type, int64_t>(f3->type(), is_valid, t64_values, &a3);
 
-  ArrayVector arrays = {a0, a1, a2, a3};
-  *out = std::make_shared<RecordBatch>(schema, a0->length(), arrays);
+  *out = RecordBatch::Make(schema, a0->length(), {a0, a1, a2, a3});
   return Status::OK();
 }
 
@@ -665,8 +662,7 @@ Status MakeFWBinary(std::shared_ptr<RecordBatch>* out) {
   RETURN_NOT_OK(b1.Finish(&a1));
   RETURN_NOT_OK(b2.Finish(&a2));
 
-  ArrayVector arrays = {a1, a2};
-  *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays);
+  *out = RecordBatch::Make(schema, a1->length(), {a1, a2});
   return Status::OK();
 }
 
@@ -695,8 +691,7 @@ Status MakeDecimal(std::shared_ptr<RecordBatch>* out) {
 
   auto a2 = std::make_shared<Decimal128Array>(f1->type(), length, data);
 
-  ArrayVector arrays = {a1, a2};
-  *out = std::make_shared<RecordBatch>(schema, length, arrays);
+  *out = RecordBatch::Make(schema, length, {a1, a2});
   return Status::OK();
 }
 
@@ -716,8 +711,7 @@ Status MakeNull(std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<Array> a2;
   ArrayFromVector<Int64Type, int64_t>(f1->type(), is_valid, int_values, &a2);
 
-  ArrayVector arrays = {a1, a2};
-  *out = std::make_shared<RecordBatch>(schema, a1->length(), arrays);
+  *out = RecordBatch::Make(schema, a1->length(), {a1, a2});
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 323116f..3c1db06 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -32,6 +32,7 @@
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/util.h"
 #include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/tensor.h"
@@ -508,12 +509,9 @@ class DictionaryWriter : public RecordBatchSerializer {
     dictionary_id_ = dictionary_id;
 
     // Make a dummy record batch. A bit tedious as we have to make a schema
-    std::vector<std::shared_ptr<Field>> fields = {
-        arrow::field("dictionary", dictionary->type())};
-    auto schema = std::make_shared<Schema>(fields);
-    RecordBatch batch(schema, dictionary->length(), {dictionary});
-
-    return RecordBatchSerializer::Write(batch, dst, metadata_length, 
body_length);
+    auto schema = arrow::schema({arrow::field("dictionary", 
dictionary->type())});
+    auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
+    return RecordBatchSerializer::Write(*batch, dst, metadata_length, 
body_length);
   }
 
  private:
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index cfbc303..bd5f8ce 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -22,8 +22,8 @@
 
 #include "arrow/array.h"
 #include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/logging.h"
diff --git a/cpp/src/arrow/python/python-test.cc 
b/cpp/src/arrow/python/python-test.cc
index 86391a1..3b7d7d8 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -23,6 +23,7 @@
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
+#include "arrow/table.h"
 #include "arrow/test-util.h"
 
 #include "arrow/python/arrow_to_pandas.h"
@@ -81,8 +82,8 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
   std::vector<std::shared_ptr<Field>> fields = {f1, f2, f3};
   std::vector<std::shared_ptr<Array>> cols = {arr, arr, arr};
 
-  auto schema = std::make_shared<Schema>(fields);
-  auto table = std::make_shared<Table>(schema, cols);
+  auto schema = ::arrow::schema(fields);
+  auto table = Table::Make(schema, cols);
 
   PyObject* out;
   Py_BEGIN_ALLOW_THREADS;
diff --git a/cpp/src/arrow/python/python_to_arrow.cc 
b/cpp/src/arrow/python/python_to_arrow.cc
index b0c6287..72cc5b6 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -32,13 +32,15 @@
 #include "arrow/builder.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/ipc/writer.h"
+#include "arrow/record_batch.h"
+#include "arrow/tensor.h"
+#include "arrow/util/logging.h"
+
 #include "arrow/python/common.h"
 #include "arrow/python/helpers.h"
 #include "arrow/python/numpy_convert.h"
 #include "arrow/python/platform.h"
 #include "arrow/python/util/datetime.h"
-#include "arrow/tensor.h"
-#include "arrow/util/logging.h"
 
 constexpr int32_t kMaxRecursionDepth = 100;
 
@@ -694,7 +696,7 @@ Status SerializeDict(PyObject* context, 
std::vector<PyObject*> dicts,
 std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
   auto field = std::make_shared<Field>("list", data->type());
   auto schema = ::arrow::schema({field});
-  return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), 
{data}));
+  return RecordBatch::Make(schema, data->length(), {data});
 }
 
 Status SerializeObject(PyObject* context, PyObject* sequence, 
SerializedPyObject* out) {
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
new file mode 100644
index 0000000..60932bd
--- /dev/null
+++ b/cpp/src/arrow/record_batch.cc
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/record_batch.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <memory>
+#include <sstream>
+
+#include "arrow/array.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+/// \class SimpleRecordBatch
+/// \brief A basic, non-lazy in-memory record batch
+class SimpleRecordBatch : public RecordBatch {
+ public:
+  SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+                    const std::vector<std::shared_ptr<Array>>& columns)
+      : RecordBatch(schema, num_rows) {
+    columns_.resize(columns.size());
+    boxed_columns_.resize(schema->num_fields());
+    for (size_t i = 0; i < columns.size(); ++i) {
+      columns_[i] = columns[i]->data();
+    }
+  }
+
+  SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+                    std::vector<std::shared_ptr<Array>>&& columns)
+      : RecordBatch(schema, num_rows) {
+    columns_.resize(columns.size());
+    boxed_columns_.resize(schema->num_fields());
+    for (size_t i = 0; i < columns.size(); ++i) {
+      columns_[i] = columns[i]->data();
+    }
+  }
+
+  SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+                    std::vector<std::shared_ptr<ArrayData>>&& columns)
+      : RecordBatch(schema, num_rows) {
+    columns_ = std::move(columns);
+    boxed_columns_.resize(schema->num_fields());
+  }
+
+  SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+                    const std::vector<std::shared_ptr<ArrayData>>& columns)
+      : RecordBatch(schema, num_rows) {
+    columns_ = columns;
+    boxed_columns_.resize(schema->num_fields());
+  }
+
+  std::shared_ptr<Array> column(int i) const override {
+    if (!boxed_columns_[i]) {
+      boxed_columns_[i] = MakeArray(columns_[i]);
+    }
+    DCHECK(boxed_columns_[i]);
+    return boxed_columns_[i];
+  }
+
+  std::shared_ptr<ArrayData> column_data(int i) const override { return 
columns_[i]; }
+
+  std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
+    auto new_schema = schema_->AddMetadata(metadata);
+    return RecordBatch::Make(new_schema, num_rows_, columns_);
+  }
+
+  std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const 
override {
+    std::vector<std::shared_ptr<ArrayData>> arrays;
+    arrays.reserve(num_columns());
+    for (const auto& field : columns_) {
+      int64_t col_length = std::min(field->length - offset, length);
+      int64_t col_offset = field->offset + offset;
+
+      auto new_data = std::make_shared<ArrayData>(*field);
+      new_data->length = col_length;
+      new_data->offset = col_offset;
+      new_data->null_count = kUnknownNullCount;
+      arrays.emplace_back(new_data);
+    }
+    int64_t num_rows = std::min(num_rows_ - offset, length);
+    return std::make_shared<SimpleRecordBatch>(schema_, num_rows, 
std::move(arrays));
+  }
+
+  Status Validate() const override {
+    if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
+      return Status::Invalid("Number of columns did not match schema");
+    }
+    return RecordBatch::Validate();
+  }
+
+ private:
+  std::vector<std::shared_ptr<ArrayData>> columns_;
+
+  // Caching boxed array data
+  mutable std::vector<std::shared_ptr<Array>> boxed_columns_;
+};
+
+RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows)
+    : schema_(schema), num_rows_(num_rows) {}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+    const std::shared_ptr<Schema>& schema, int64_t num_rows,
+    const std::vector<std::shared_ptr<Array>>& columns) {
+  return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns);
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+    const std::shared_ptr<Schema>& schema, int64_t num_rows,
+    std::vector<std::shared_ptr<Array>>&& columns) {
+  return std::make_shared<SimpleRecordBatch>(schema, num_rows, 
std::move(columns));
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+    const std::shared_ptr<Schema>& schema, int64_t num_rows,
+    std::vector<std::shared_ptr<ArrayData>>&& columns) {
+  return std::make_shared<SimpleRecordBatch>(schema, num_rows, 
std::move(columns));
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Make(
+    const std::shared_ptr<Schema>& schema, int64_t num_rows,
+    const std::vector<std::shared_ptr<ArrayData>>& columns) {
+  return std::make_shared<SimpleRecordBatch>(schema, num_rows, columns);
+}
+
+const std::string& RecordBatch::column_name(int i) const {
+  return schema_->field(i)->name();
+}
+
+bool RecordBatch::Equals(const RecordBatch& other) const {
+  if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+    return false;
+  }
+
+  for (int i = 0; i < num_columns(); ++i) {
+    if (!column(i)->Equals(other.column(i))) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
+  if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+    return false;
+  }
+
+  for (int i = 0; i < num_columns(); ++i) {
+    if (!column(i)->ApproxEquals(other.column(i))) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
+  return Slice(offset, this->num_rows() - offset);
+}
+
+Status RecordBatch::Validate() const {
+  for (int i = 0; i < num_columns(); ++i) {
+    auto arr_shared = this->column_data(i);
+    const ArrayData& arr = *arr_shared;
+    if (arr.length != num_rows_) {
+      std::stringstream ss;
+      ss << "Number of rows in column " << i << " did not match batch: " << 
arr.length
+         << " vs " << num_rows_;
+      return Status::Invalid(ss.str());
+    }
+    const auto& schema_type = *schema_->field(i)->type();
+    if (!arr.type->Equals(schema_type)) {
+      std::stringstream ss;
+      ss << "Column " << i << " type not match schema: " << 
arr.type->ToString() << " vs "
+         << schema_type.ToString();
+      return Status::Invalid(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Base record batch reader
+
+RecordBatchReader::~RecordBatchReader() {}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
new file mode 100644
index 0000000..b2c4c76
--- /dev/null
+++ b/cpp/src/arrow/record_batch.h
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_RECORD_BATCH_H
+#define ARROW_RECORD_BATCH_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class KeyValueMetadata;
+class Status;
+
+/// \class RecordBatch
+/// \brief Collection of equal-length arrays matching a particular Schema
+///
+/// A record batch is table-like data structure that is semantically a sequence
+/// of fields, each a contiguous Arrow array
+class ARROW_EXPORT RecordBatch {
+ public:
+  virtual ~RecordBatch() = default;
+
+  /// \param[in] schema The record batch schema
+  /// \param[in] num_rows length of fields in the record batch. Each array
+  /// should have the same length as num_rows
+  /// \param[in] columns the record batch fields as vector of arrays
+  static std::shared_ptr<RecordBatch> Make(
+      const std::shared_ptr<Schema>& schema, int64_t num_rows,
+      const std::vector<std::shared_ptr<Array>>& columns);
+
+  /// \brief Move-based constructor for a vector of Array instances
+  static std::shared_ptr<RecordBatch> Make(const std::shared_ptr<Schema>& 
schema,
+                                           int64_t num_rows,
+                                           
std::vector<std::shared_ptr<Array>>&& columns);
+
+  /// \brief Construct record batch from vector of internal data structures
+  /// \since 0.5.0
+  ///
+  /// This class is only provided with an rvalue-reference for the input data,
+  /// and is intended for internal use, or advanced users.
+  ///
+  /// \param schema the record batch schema
+  /// \param num_rows the number of semantic rows in the record batch. This
+  /// should be equal to the length of each field
+  /// \param columns the data for the batch's columns
+  static std::shared_ptr<RecordBatch> Make(
+      const std::shared_ptr<Schema>& schema, int64_t num_rows,
+      std::vector<std::shared_ptr<ArrayData>>&& columns);
+
+  /// \brief Construct record batch by copying vector of array data
+  /// \since 0.5.0
+  static std::shared_ptr<RecordBatch> Make(
+      const std::shared_ptr<Schema>& schema, int64_t num_rows,
+      const std::vector<std::shared_ptr<ArrayData>>& columns);
+
+  /// \brief Determine if two record batches are exactly equal
+  /// \return true if batches are equal
+  bool Equals(const RecordBatch& other) const;
+
+  /// \brief Determine if two record batches are approximately equal
+  bool ApproxEquals(const RecordBatch& other) const;
+
+  // \return the table's schema
+  /// \return true if batches are equal
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+  /// \brief Retrieve an array from the record batch
+  /// \param[in] i field index, does not boundscheck
+  /// \return an Array object
+  virtual std::shared_ptr<Array> column(int i) const = 0;
+
+  /// \brief Retrieve an array's internaldata from the record batch
+  /// \param[in] i field index, does not boundscheck
+  /// \return an internal ArrayData object
+  virtual std::shared_ptr<ArrayData> column_data(int i) const = 0;
+
+  virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
+
+  /// \brief Name in i-th column
+  const std::string& column_name(int i) const;
+
+  /// \return the number of columns in the table
+  int num_columns() const { return schema_->num_fields(); }
+
+  /// \return the number of rows (the corresponding length of each column)
+  int64_t num_rows() const { return num_rows_; }
+
+  /// \brief Slice each of the arrays in the record batch
+  /// \param[in] offset the starting offset to slice, through end of batch
+  /// \return new record batch
+  virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
+
+  /// \brief Slice each of the arrays in the record batch
+  /// \param[in] offset the starting offset to slice
+  /// \param[in] length the number of elements to slice from offset
+  /// \return new record batch
+  virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) 
const = 0;
+
+  /// \brief Check for schema or length inconsistencies
+  /// \return Status
+  virtual Status Validate() const;
+
+ protected:
+  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
+
+  std::shared_ptr<Schema> schema_;
+  int64_t num_rows_;
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
+};
+
+/// \brief Abstract interface for reading stream of record batches
+class ARROW_EXPORT RecordBatchReader {
+ public:
+  virtual ~RecordBatchReader();
+
+  /// \return the shared schema of the record batches in the stream
+  virtual std::shared_ptr<Schema> schema() const = 0;
+
+  /// Read the next record batch in the stream. Return null for batch when
+  /// reaching end of stream
+  ///
+  /// \param[out] batch the next loaded batch, null at end of stream
+  /// \return Status
+  virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+};
+
+}  // namespace arrow
+
+#endif  // ARROW_RECORD_BATCH_H
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index b490310..e77d3aa 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -22,6 +22,7 @@
 #include "gtest/gtest.h"
 
 #include "arrow/array.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/test-common.h"
@@ -216,8 +217,8 @@ class TestTable : public TestBase {
 
 TEST_F(TestTable, EmptySchema) {
   auto empty_schema = ::arrow::schema({});
-  table_.reset(new Table(empty_schema, columns_));
-  ASSERT_OK(table_->ValidateColumns());
+  table_ = Table::Make(empty_schema, columns_);
+  ASSERT_OK(table_->Validate());
   ASSERT_EQ(0, table_->num_rows());
   ASSERT_EQ(0, table_->num_columns());
 }
@@ -226,20 +227,20 @@ TEST_F(TestTable, Ctors) {
   const int length = 100;
   MakeExample1(length);
 
-  table_.reset(new Table(schema_, columns_));
-  ASSERT_OK(table_->ValidateColumns());
+  table_ = Table::Make(schema_, columns_);
+  ASSERT_OK(table_->Validate());
   ASSERT_EQ(length, table_->num_rows());
   ASSERT_EQ(3, table_->num_columns());
 
-  auto array_ctor = std::make_shared<Table>(schema_, arrays_);
+  auto array_ctor = Table::Make(schema_, arrays_);
   ASSERT_TRUE(table_->Equals(*array_ctor));
 
-  table_.reset(new Table(schema_, columns_, length));
-  ASSERT_OK(table_->ValidateColumns());
+  table_ = Table::Make(schema_, columns_, length);
+  ASSERT_OK(table_->Validate());
   ASSERT_EQ(length, table_->num_rows());
 
-  ASSERT_OK(MakeTable(schema_, arrays_, &table_));
-  ASSERT_OK(table_->ValidateColumns());
+  table_ = Table::Make(schema_, arrays_);
+  ASSERT_OK(table_->Validate());
   ASSERT_EQ(length, table_->num_rows());
   ASSERT_EQ(3, table_->num_columns());
 }
@@ -248,7 +249,7 @@ TEST_F(TestTable, Metadata) {
   const int length = 100;
   MakeExample1(length);
 
-  table_.reset(new Table(schema_, columns_));
+  table_ = Table::Make(schema_, columns_);
 
   ASSERT_TRUE(table_->schema()->Equals(*schema_));
 
@@ -262,14 +263,14 @@ TEST_F(TestTable, InvalidColumns) {
   const int length = 100;
   MakeExample1(length);
 
-  table_.reset(new Table(schema_, columns_, length - 1));
-  ASSERT_RAISES(Invalid, table_->ValidateColumns());
+  table_ = Table::Make(schema_, columns_, length - 1);
+  ASSERT_RAISES(Invalid, table_->Validate());
 
   columns_.clear();
 
   // Wrong number of columns
-  table_.reset(new Table(schema_, columns_, length));
-  ASSERT_RAISES(Invalid, table_->ValidateColumns());
+  table_ = Table::Make(schema_, columns_, length);
+  ASSERT_RAISES(Invalid, table_->Validate());
 
   columns_ = {
       std::make_shared<Column>(schema_->field(0), 
MakeRandomArray<Int32Array>(length)),
@@ -277,15 +278,15 @@ TEST_F(TestTable, InvalidColumns) {
       std::make_shared<Column>(schema_->field(2),
                                MakeRandomArray<Int16Array>(length - 1))};
 
-  table_.reset(new Table(schema_, columns_, length));
-  ASSERT_RAISES(Invalid, table_->ValidateColumns());
+  table_ = Table::Make(schema_, columns_, length);
+  ASSERT_RAISES(Invalid, table_->Validate());
 }
 
 TEST_F(TestTable, Equals) {
   const int length = 100;
   MakeExample1(length);
 
-  table_.reset(new Table(schema_, columns_));
+  table_ = Table::Make(schema_, columns_);
 
   ASSERT_TRUE(table_->Equals(*table_));
   // Differing schema
@@ -294,7 +295,8 @@ TEST_F(TestTable, Equals) {
   auto f2 = field("f5", int16());
   vector<shared_ptr<Field>> fields = {f0, f1, f2};
   auto other_schema = std::make_shared<Schema>(fields);
-  ASSERT_FALSE(table_->Equals(Table(other_schema, columns_)));
+  auto other = Table::Make(other_schema, columns_);
+  ASSERT_FALSE(table_->Equals(*other));
   // Differing columns
   std::vector<std::shared_ptr<Column>> other_columns = {
       std::make_shared<Column>(schema_->field(0),
@@ -303,19 +305,21 @@ TEST_F(TestTable, Equals) {
                                MakeRandomArray<UInt8Array>(length, 10)),
       std::make_shared<Column>(schema_->field(2),
                                MakeRandomArray<Int16Array>(length, 10))};
-  ASSERT_FALSE(table_->Equals(Table(schema_, other_columns)));
+
+  other = Table::Make(schema_, other_columns);
+  ASSERT_FALSE(table_->Equals(*other));
 }
 
 TEST_F(TestTable, FromRecordBatches) {
   const int64_t length = 10;
   MakeExample1(length);
 
-  auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+  auto batch1 = RecordBatch::Make(schema_, length, arrays_);
 
   std::shared_ptr<Table> result, expected;
   ASSERT_OK(Table::FromRecordBatches({batch1}, &result));
 
-  expected = std::make_shared<Table>(schema_, columns_);
+  expected = Table::Make(schema_, columns_);
   ASSERT_TRUE(result->Equals(*expected));
 
   std::vector<std::shared_ptr<Column>> other_columns;
@@ -325,18 +329,17 @@ TEST_F(TestTable, FromRecordBatches) {
   }
 
   ASSERT_OK(Table::FromRecordBatches({batch1, batch1}, &result));
-  expected = std::make_shared<Table>(schema_, other_columns);
+  expected = Table::Make(schema_, other_columns);
   ASSERT_TRUE(result->Equals(*expected));
 
   // Error states
   std::vector<std::shared_ptr<RecordBatch>> empty_batches;
   ASSERT_RAISES(Invalid, Table::FromRecordBatches(empty_batches, &result));
 
-  std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), 
schema_->field(1)};
-  auto other_schema = std::make_shared<Schema>(fields);
+  auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
 
   std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
-  auto batch2 = std::make_shared<RecordBatch>(other_schema, length, 
other_arrays);
+  auto batch2 = RecordBatch::Make(other_schema, length, other_arrays);
   ASSERT_RAISES(Invalid, Table::FromRecordBatches({batch1, batch2}, &result));
 }
 
@@ -344,11 +347,11 @@ TEST_F(TestTable, ConcatenateTables) {
   const int64_t length = 10;
 
   MakeExample1(length);
-  auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+  auto batch1 = RecordBatch::Make(schema_, length, arrays_);
 
   // generate different data
   MakeExample1(length);
-  auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+  auto batch2 = RecordBatch::Make(schema_, length, arrays_);
 
   std::shared_ptr<Table> t1, t2, t3, result, expected;
   ASSERT_OK(Table::FromRecordBatches({batch1}, &t1));
@@ -362,11 +365,10 @@ TEST_F(TestTable, ConcatenateTables) {
   std::vector<std::shared_ptr<Table>> empty_tables;
   ASSERT_RAISES(Invalid, ConcatenateTables(empty_tables, &result));
 
-  std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), 
schema_->field(1)};
-  auto other_schema = std::make_shared<Schema>(fields);
+  auto other_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
 
   std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
-  auto batch3 = std::make_shared<RecordBatch>(other_schema, length, 
other_arrays);
+  auto batch3 = RecordBatch::Make(other_schema, length, other_arrays);
   ASSERT_OK(Table::FromRecordBatches({batch3}, &t3));
 
   ASSERT_RAISES(Invalid, ConcatenateTables({t1, t3}, &result));
@@ -376,31 +378,38 @@ TEST_F(TestTable, RemoveColumn) {
   const int64_t length = 10;
   MakeExample1(length);
 
-  Table table(schema_, columns_);
+  auto table_sp = Table::Make(schema_, columns_);
+  const Table& table = *table_sp;
 
   std::shared_ptr<Table> result;
   ASSERT_OK(table.RemoveColumn(0, &result));
 
   auto ex_schema = ::arrow::schema({schema_->field(1), schema_->field(2)});
   std::vector<std::shared_ptr<Column>> ex_columns = {table.column(1), 
table.column(2)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+  auto expected = Table::Make(ex_schema, ex_columns);
+  ASSERT_TRUE(result->Equals(*expected));
 
   ASSERT_OK(table.RemoveColumn(1, &result));
   ex_schema = ::arrow::schema({schema_->field(0), schema_->field(2)});
   ex_columns = {table.column(0), table.column(2)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+  expected = Table::Make(ex_schema, ex_columns);
+  ASSERT_TRUE(result->Equals(*expected));
 
   ASSERT_OK(table.RemoveColumn(2, &result));
   ex_schema = ::arrow::schema({schema_->field(0), schema_->field(1)});
   ex_columns = {table.column(0), table.column(1)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+  expected = Table::Make(ex_schema, ex_columns);
+  ASSERT_TRUE(result->Equals(*expected));
 }
 
 TEST_F(TestTable, AddColumn) {
   const int64_t length = 10;
   MakeExample1(length);
 
-  Table table(schema_, columns_);
+  auto table_sp = Table::Make(schema_, columns_);
+  const Table& table = *table_sp;
 
   std::shared_ptr<Table> result;
   // Some negative tests with invalid index
@@ -419,50 +428,32 @@ TEST_F(TestTable, AddColumn) {
   ASSERT_OK(table.AddColumn(0, columns_[0], &result));
   auto ex_schema = ::arrow::schema(
       {schema_->field(0), schema_->field(0), schema_->field(1), 
schema_->field(2)});
-  std::vector<std::shared_ptr<Column>> ex_columns = {table.column(0), 
table.column(0),
-                                                     table.column(1), 
table.column(2)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+  auto expected = Table::Make(
+      ex_schema, {table.column(0), table.column(0), table.column(1), 
table.column(2)});
+  ASSERT_TRUE(result->Equals(*expected));
 
   ASSERT_OK(table.AddColumn(1, columns_[0], &result));
   ex_schema = ::arrow::schema(
       {schema_->field(0), schema_->field(0), schema_->field(1), 
schema_->field(2)});
-  ex_columns = {table.column(0), table.column(0), table.column(1), 
table.column(2)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+
+  expected = Table::Make(
+      ex_schema, {table.column(0), table.column(0), table.column(1), 
table.column(2)});
+  ASSERT_TRUE(result->Equals(*expected));
 
   ASSERT_OK(table.AddColumn(2, columns_[0], &result));
   ex_schema = ::arrow::schema(
       {schema_->field(0), schema_->field(1), schema_->field(0), 
schema_->field(2)});
-  ex_columns = {table.column(0), table.column(1), table.column(0), 
table.column(2)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
+  expected = Table::Make(
+      ex_schema, {table.column(0), table.column(1), table.column(0), 
table.column(2)});
+  ASSERT_TRUE(result->Equals(*expected));
 
   ASSERT_OK(table.AddColumn(3, columns_[0], &result));
   ex_schema = ::arrow::schema(
       {schema_->field(0), schema_->field(1), schema_->field(2), 
schema_->field(0)});
-  ex_columns = {table.column(0), table.column(1), table.column(2), 
table.column(0)};
-  ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
-}
-
-TEST_F(TestTable, IsChunked) {
-  ArrayVector c1, c2;
-
-  auto a1 = MakeRandomArray<Int32Array>(10);
-  auto a2 = MakeRandomArray<Int32Array>(20);
-
-  auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())});
-
-  std::vector<std::shared_ptr<Column>> columns;
-
-  std::shared_ptr<RecordBatch> batch;
-
-  columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})};
-  auto t1 = std::make_shared<Table>(sch1, columns);
-
-  ASSERT_FALSE(t1->IsChunked());
-
-  columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})};
-  auto t2 = std::make_shared<Table>(sch1, columns);
-
-  ASSERT_TRUE(t2->IsChunked());
+  expected = Table::Make(
+      ex_schema, {table.column(0), table.column(1), table.column(2), 
table.column(0)});
+  ASSERT_TRUE(result->Equals(*expected));
 }
 
 class TestRecordBatch : public TestBase {};
@@ -475,24 +466,22 @@ TEST_F(TestRecordBatch, Equals) {
   auto f2 = field("f2", int16());
 
   vector<shared_ptr<Field>> fields = {f0, f1, f2};
-  auto schema = std::make_shared<Schema>(fields);
+  auto schema = ::arrow::schema({f0, f1, f2});
+  auto schema2 = ::arrow::schema({f0, f1});
 
   auto a0 = MakeRandomArray<Int32Array>(length);
   auto a1 = MakeRandomArray<UInt8Array>(length);
   auto a2 = MakeRandomArray<Int16Array>(length);
 
-  RecordBatch b1(schema, length, {a0, a1, a2});
-  RecordBatch b3(schema, length, {a0, a1});
-  RecordBatch b4(schema, length, {a0, a1, a1});
+  auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
+  auto b3 = RecordBatch::Make(schema2, length, {a0, a1});
+  auto b4 = RecordBatch::Make(schema, length, {a0, a1, a1});
 
-  ASSERT_TRUE(b1.Equals(b1));
-  ASSERT_FALSE(b1.Equals(b3));
-  ASSERT_FALSE(b1.Equals(b4));
+  ASSERT_TRUE(b1->Equals(*b1));
+  ASSERT_FALSE(b1->Equals(*b3));
+  ASSERT_FALSE(b1->Equals(*b4));
 }
 
-#ifdef NDEBUG
-// In debug builds, RecordBatch ctor aborts if you construct an invalid one
-
 TEST_F(TestRecordBatch, Validate) {
   const int length = 10;
 
@@ -507,21 +496,19 @@ TEST_F(TestRecordBatch, Validate) {
   auto a2 = MakeRandomArray<Int16Array>(length);
   auto a3 = MakeRandomArray<Int16Array>(5);
 
-  RecordBatch b1(schema, length, {a0, a1, a2});
+  auto b1 = RecordBatch::Make(schema, length, {a0, a1, a2});
 
-  ASSERT_OK(b1.Validate());
+  ASSERT_OK(b1->Validate());
 
   // Length mismatch
-  RecordBatch b2(schema, length, {a0, a1, a3});
-  ASSERT_RAISES(Invalid, b2.Validate());
+  auto b2 = RecordBatch::Make(schema, length, {a0, a1, a3});
+  ASSERT_RAISES(Invalid, b2->Validate());
 
   // Type mismatch
-  RecordBatch b3(schema, length, {a0, a1, a0});
-  ASSERT_RAISES(Invalid, b3.Validate());
+  auto b3 = RecordBatch::Make(schema, length, {a0, a1, a0});
+  ASSERT_RAISES(Invalid, b3->Validate());
 }
 
-#endif
-
 TEST_F(TestRecordBatch, Slice) {
   const int length = 10;
 
@@ -529,19 +516,19 @@ TEST_F(TestRecordBatch, Slice) {
   auto f1 = field("f1", uint8());
 
   vector<shared_ptr<Field>> fields = {f0, f1};
-  auto schema = std::make_shared<Schema>(fields);
+  auto schema = ::arrow::schema(fields);
 
   auto a0 = MakeRandomArray<Int32Array>(length);
   auto a1 = MakeRandomArray<UInt8Array>(length);
 
-  RecordBatch batch(schema, length, {a0, a1});
+  auto batch = RecordBatch::Make(schema, length, {a0, a1});
 
-  auto batch_slice = batch.Slice(2);
-  auto batch_slice2 = batch.Slice(1, 5);
+  auto batch_slice = batch->Slice(2);
+  auto batch_slice2 = batch->Slice(1, 5);
 
-  ASSERT_EQ(batch_slice->num_rows(), batch.num_rows() - 2);
+  ASSERT_EQ(batch_slice->num_rows(), batch->num_rows() - 2);
 
-  for (int i = 0; i < batch.num_columns(); ++i) {
+  for (int i = 0; i < batch->num_columns(); ++i) {
     ASSERT_EQ(2, batch_slice->column(i)->offset());
     ASSERT_EQ(length - 2, batch_slice->column(i)->length());
 
@@ -567,9 +554,9 @@ TEST_F(TestTableBatchReader, ReadNext) {
   std::shared_ptr<RecordBatch> batch;
 
   columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2, 
a2})};
-  Table t1(sch1, columns);
+  auto t1 = Table::Make(sch1, columns);
 
-  TableBatchReader i1(t1);
+  TableBatchReader i1(*t1);
 
   ASSERT_OK(i1.ReadNext(&batch));
   ASSERT_EQ(10, batch->num_rows());
@@ -584,9 +571,9 @@ TEST_F(TestTableBatchReader, ReadNext) {
   ASSERT_EQ(nullptr, batch);
 
   columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})};
-  Table t2(sch1, columns);
+  auto t2 = Table::Make(sch1, columns);
 
-  TableBatchReader i2(t2);
+  TableBatchReader i2(*t2);
 
   ASSERT_OK(i2.ReadNext(&batch));
   ASSERT_EQ(10, batch->num_rows());
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index fe19bf4..8f3f195 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -23,6 +23,7 @@
 #include <sstream>
 
 #include "arrow/array.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/util/logging.h"
@@ -153,171 +154,126 @@ Status Column::ValidateData() {
 }
 
 // ----------------------------------------------------------------------
-// RecordBatch methods
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows)
-    : schema_(schema), num_rows_(num_rows) {
-  boxed_columns_.resize(schema->num_fields());
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows,
-                         const std::vector<std::shared_ptr<Array>>& columns)
-    : RecordBatch(schema, num_rows) {
-  columns_.resize(columns.size());
-  for (size_t i = 0; i < columns.size(); ++i) {
-    columns_[i] = columns[i]->data();
-  }
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows,
-                         std::vector<std::shared_ptr<Array>>&& columns)
-    : RecordBatch(schema, num_rows) {
-  columns_.resize(columns.size());
-  for (size_t i = 0; i < columns.size(); ++i) {
-    columns_[i] = columns[i]->data();
-  }
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows,
-                         std::vector<std::shared_ptr<ArrayData>>&& columns)
-    : RecordBatch(schema, num_rows) {
-  columns_ = std::move(columns);
-}
-
-RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t 
num_rows,
-                         const std::vector<std::shared_ptr<ArrayData>>& 
columns)
-    : RecordBatch(schema, num_rows) {
-  columns_ = columns;
-}
-
-std::shared_ptr<Array> RecordBatch::column(int i) const {
-  if (!boxed_columns_[i]) {
-    boxed_columns_[i] = MakeArray(columns_[i]);
-  }
-  DCHECK(boxed_columns_[i]);
-  return boxed_columns_[i];
-}
-
-const std::string& RecordBatch::column_name(int i) const {
-  return schema_->field(i)->name();
-}
-
-bool RecordBatch::Equals(const RecordBatch& other) const {
-  if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
-    return false;
-  }
+// Table methods
 
-  for (int i = 0; i < num_columns(); ++i) {
-    if (!column(i)->Equals(other.column(i))) {
-      return false;
+/// \class SimpleTable
+/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
+class SimpleTable : public Table {
+ public:
+  SimpleTable(const std::shared_ptr<Schema>& schema,
+              const std::vector<std::shared_ptr<Column>>& columns, int64_t 
num_rows = -1)
+      : columns_(columns) {
+    schema_ = schema;
+    if (num_rows < 0) {
+      if (columns.size() == 0) {
+        num_rows_ = 0;
+      } else {
+        num_rows_ = columns[0]->length();
+      }
+    } else {
+      num_rows_ = num_rows;
     }
   }
 
-  return true;
-}
-
-bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
-  if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
-    return false;
-  }
+  SimpleTable(const std::shared_ptr<Schema>& schema,
+              const std::vector<std::shared_ptr<Array>>& columns, int64_t 
num_rows = -1) {
+    schema_ = schema;
+    if (num_rows < 0) {
+      if (columns.size() == 0) {
+        num_rows_ = 0;
+      } else {
+        num_rows_ = columns[0]->length();
+      }
+    } else {
+      num_rows_ = num_rows;
+    }
 
-  for (int i = 0; i < num_columns(); ++i) {
-    if (!column(i)->ApproxEquals(other.column(i))) {
-      return false;
+    columns_.resize(columns.size());
+    for (size_t i = 0; i < columns.size(); ++i) {
+      columns_[i] =
+          std::make_shared<Column>(schema->field(static_cast<int>(i)), 
columns[i]);
     }
   }
 
-  return true;
-}
-
-std::shared_ptr<RecordBatch> RecordBatch::ReplaceSchemaMetadata(
-    const std::shared_ptr<const KeyValueMetadata>& metadata) const {
-  auto new_schema = schema_->AddMetadata(metadata);
-  return std::make_shared<RecordBatch>(new_schema, num_rows_, columns_);
-}
+  std::shared_ptr<Column> column(int i) const override { return columns_[i]; }
 
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
-  return Slice(offset, this->num_rows() - offset);
-}
+  Status RemoveColumn(int i, std::shared_ptr<Table>* out) const override {
+    std::shared_ptr<Schema> new_schema;
+    RETURN_NOT_OK(schema_->RemoveField(i, &new_schema));
 
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t 
length) const {
-  std::vector<std::shared_ptr<ArrayData>> arrays;
-  arrays.reserve(num_columns());
-  for (const auto& field : columns_) {
-    int64_t col_length = std::min(field->length - offset, length);
-    int64_t col_offset = field->offset + offset;
-
-    auto new_data = std::make_shared<ArrayData>(*field);
-    new_data->length = col_length;
-    new_data->offset = col_offset;
-    new_data->null_count = kUnknownNullCount;
-    arrays.emplace_back(new_data);
+    *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i));
+    return Status::OK();
   }
-  int64_t num_rows = std::min(num_rows_ - offset, length);
-  return std::make_shared<RecordBatch>(schema_, num_rows, std::move(arrays));
-}
 
-Status RecordBatch::Validate() const {
-  for (int i = 0; i < num_columns(); ++i) {
-    const ArrayData& arr = *columns_[i];
-    if (arr.length != num_rows_) {
+  Status AddColumn(int i, const std::shared_ptr<Column>& col,
+                   std::shared_ptr<Table>* out) const override {
+    if (i < 0 || i > num_columns() + 1) {
+      return Status::Invalid("Invalid column index.");
+    }
+    if (col == nullptr) {
       std::stringstream ss;
-      ss << "Number of rows in column " << i << " did not match batch: " << 
arr.length
-         << " vs " << num_rows_;
+      ss << "Column " << i << " was null";
       return Status::Invalid(ss.str());
     }
-    const auto& schema_type = *schema_->field(i)->type();
-    if (!arr.type->Equals(schema_type)) {
+    if (col->length() != num_rows_) {
       std::stringstream ss;
-      ss << "Column " << i << " type not match schema: " << 
arr.type->ToString() << " vs "
-         << schema_type.ToString();
+      ss << "Added column's length must match table's length. Expected length "
+         << num_rows_ << " but got length " << col->length();
       return Status::Invalid(ss.str());
     }
+
+    std::shared_ptr<Schema> new_schema;
+    RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema));
+
+    *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, 
col));
+    return Status::OK();
   }
-  return Status::OK();
-}
 
-// ----------------------------------------------------------------------
-// Table methods
+  std::shared_ptr<Table> ReplaceSchemaMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
+    auto new_schema = schema_->AddMetadata(metadata);
+    return Table::Make(new_schema, columns_);
+  }
 
-Table::Table(const std::shared_ptr<Schema>& schema,
-             const std::vector<std::shared_ptr<Column>>& columns, int64_t 
num_rows)
-    : schema_(schema), columns_(columns) {
-  if (num_rows < 0) {
-    if (columns.size() == 0) {
-      num_rows_ = 0;
-    } else {
-      num_rows_ = columns[0]->length();
+  Status Validate() const override {
+    if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
+      return Status::Invalid("Number of columns did not match schema");
     }
-  } else {
-    num_rows_ = num_rows;
-  }
-}
 
-Table::Table(const std::shared_ptr<Schema>& schema,
-             const std::vector<std::shared_ptr<Array>>& columns, int64_t 
num_rows)
-    : schema_(schema) {
-  if (num_rows < 0) {
-    if (columns.size() == 0) {
-      num_rows_ = 0;
-    } else {
-      num_rows_ = columns[0]->length();
+    // Make sure columns are all the same length
+    for (int i = 0; i < num_columns(); ++i) {
+      const Column* col = columns_[i].get();
+      if (col == nullptr) {
+        std::stringstream ss;
+        ss << "Column " << i << " was null";
+        return Status::Invalid(ss.str());
+      }
+      if (col->length() != num_rows_) {
+        std::stringstream ss;
+        ss << "Column " << i << " named " << col->name() << " expected length "
+           << num_rows_ << " but got length " << col->length();
+        return Status::Invalid(ss.str());
+      }
     }
-  } else {
-    num_rows_ = num_rows;
+    return Status::OK();
   }
 
-  columns_.resize(columns.size());
-  for (size_t i = 0; i < columns.size(); ++i) {
-    columns_[i] =
-        std::make_shared<Column>(schema->field(static_cast<int>(i)), 
columns[i]);
-  }
+ private:
+  std::vector<std::shared_ptr<Column>> columns_;
+};
+
+Table::Table() {}
+
+std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
+                                   const std::vector<std::shared_ptr<Column>>& 
columns,
+                                   int64_t num_rows) {
+  return std::make_shared<SimpleTable>(schema, columns, num_rows);
 }
 
-std::shared_ptr<Table> Table::ReplaceSchemaMetadata(
-    const std::shared_ptr<const KeyValueMetadata>& metadata) const {
-  auto new_schema = schema_->AddMetadata(metadata);
-  return std::make_shared<Table>(new_schema, columns_);
+std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
+                                   const std::vector<std::shared_ptr<Array>>& 
arrays,
+                                   int64_t num_rows) {
+  return std::make_shared<SimpleTable>(schema, arrays, num_rows);
 }
 
 Status Table::FromRecordBatches(const 
std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -351,7 +307,7 @@ Status Table::FromRecordBatches(const 
std::vector<std::shared_ptr<RecordBatch>>&
     columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
   }
 
-  *table = std::make_shared<Table>(schema, columns);
+  *table = Table::Make(schema, columns);
   return Status::OK();
 }
 
@@ -388,7 +344,7 @@ Status ConcatenateTables(const 
std::vector<std::shared_ptr<Table>>& tables,
     }
     columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
   }
-  *table = std::make_shared<Table>(schema, columns);
+  *table = Table::Make(schema, columns);
   return Status::OK();
 }
 
@@ -399,82 +355,19 @@ bool Table::Equals(const Table& other) const {
   if (!schema_->Equals(*other.schema())) {
     return false;
   }
-  if (static_cast<int64_t>(columns_.size()) != other.num_columns()) {
+  if (this->num_columns() != other.num_columns()) {
     return false;
   }
 
-  for (int i = 0; i < static_cast<int>(columns_.size()); i++) {
-    if (!columns_[i]->Equals(other.column(i))) {
+  for (int i = 0; i < this->num_columns(); i++) {
+    if (!this->column(i)->Equals(other.column(i))) {
       return false;
     }
   }
   return true;
 }
 
-Status Table::RemoveColumn(int i, std::shared_ptr<Table>* out) const {
-  std::shared_ptr<Schema> new_schema;
-  RETURN_NOT_OK(schema_->RemoveField(i, &new_schema));
-
-  *out = std::make_shared<Table>(new_schema, 
internal::DeleteVectorElement(columns_, i));
-  return Status::OK();
-}
-
-Status Table::AddColumn(int i, const std::shared_ptr<Column>& col,
-                        std::shared_ptr<Table>* out) const {
-  if (i < 0 || i > num_columns() + 1) {
-    return Status::Invalid("Invalid column index.");
-  }
-  if (col == nullptr) {
-    std::stringstream ss;
-    ss << "Column " << i << " was null";
-    return Status::Invalid(ss.str());
-  }
-  if (col->length() != num_rows_) {
-    std::stringstream ss;
-    ss << "Added column's length must match table's length. Expected length " 
<< num_rows_
-       << " but got length " << col->length();
-    return Status::Invalid(ss.str());
-  }
-
-  std::shared_ptr<Schema> new_schema;
-  RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema));
-
-  *out =
-      std::make_shared<Table>(new_schema, internal::AddVectorElement(columns_, 
i, col));
-  return Status::OK();
-}
-
-Status Table::ValidateColumns() const {
-  if (num_columns() != schema_->num_fields()) {
-    return Status::Invalid("Number of columns did not match schema");
-  }
-
-  // Make sure columns are all the same length
-  for (size_t i = 0; i < columns_.size(); ++i) {
-    const Column* col = columns_[i].get();
-    if (col == nullptr) {
-      std::stringstream ss;
-      ss << "Column " << i << " was null";
-      return Status::Invalid(ss.str());
-    }
-    if (col->length() != num_rows_) {
-      std::stringstream ss;
-      ss << "Column " << i << " named " << col->name() << " expected length " 
<< num_rows_
-         << " but got length " << col->length();
-      return Status::Invalid(ss.str());
-    }
-  }
-  return Status::OK();
-}
-
-bool Table::IsChunked() const {
-  for (size_t i = 0; i < columns_.size(); ++i) {
-    if (columns_[i]->data()->num_chunks() > 1) {
-      return true;
-    }
-  }
-  return false;
-}
+#ifndef ARROW_NO_DEPRECATED_API
 
 Status MakeTable(const std::shared_ptr<Schema>& schema,
                  const std::vector<std::shared_ptr<Array>>& arrays,
@@ -493,15 +386,12 @@ Status MakeTable(const std::shared_ptr<Schema>& schema,
     columns.emplace_back(std::make_shared<Column>(schema->field(i), 
arrays[i]));
   }
 
-  *table = std::make_shared<Table>(schema, columns);
+  *table = Table::Make(schema, columns);
 
   return Status::OK();
 }
 
-// ----------------------------------------------------------------------
-// Base record batch reader
-
-RecordBatchReader::~RecordBatchReader() {}
+#endif  // ARROW_NO_DEPRECATED_API
 
 // ----------------------------------------------------------------------
 // Convert a table to a sequence of record batches
@@ -565,8 +455,7 @@ class TableBatchReader::TableBatchReaderImpl {
     }
 
     absolute_row_position_ += chunksize;
-    *out =
-        std::make_shared<RecordBatch>(table_.schema(), chunksize, 
std::move(batch_data));
+    *out = RecordBatch::Make(table_.schema(), chunksize, 
std::move(batch_data));
 
     return Status::OK();
   }
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 2cff32f..d0312d9 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/record_batch.h"
 #include "arrow/type.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
@@ -33,8 +34,6 @@ namespace arrow {
 class KeyValueMetadata;
 class Status;
 
-using ArrayVector = std::vector<std::shared_ptr<Array>>;
-
 /// \class ChunkedArray
 /// \brief A data structure managing a list of primitive Arrow arrays logically
 /// as one large array
@@ -113,123 +112,28 @@ class ARROW_EXPORT Column {
   ARROW_DISALLOW_COPY_AND_ASSIGN(Column);
 };
 
-/// \class RecordBatch
-/// \brief Collection of equal-length arrays matching a particular Schema
-///
-/// A record batch is table-like data structure consisting of an internal
-/// sequence of fields, each a contiguous Arrow array
-class ARROW_EXPORT RecordBatch {
- public:
-  /// \param[in] schema The record batch schema
-  /// \param[in] num_rows length of fields in the record batch. Each array
-  /// should have the same length as num_rows
-  /// \param[in] columns the record batch fields as vector of arrays
-  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-              const std::vector<std::shared_ptr<Array>>& columns);
-
-  /// \brief Move-based constructor for a vector of Array instances
-  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-              std::vector<std::shared_ptr<Array>>&& columns);
-
-  /// \brief Construct record batch from vector of internal data structures
-  /// \since 0.5.0
-  ///
-  /// This class is only provided with an rvalue-reference for the input data,
-  /// and is intended for internal use, or advanced users.
-  ///
-  /// \param schema the record batch schema
-  /// \param num_rows the number of semantic rows in the record batch. This
-  /// should be equal to the length of each field
-  /// \param columns the data for the batch's columns
-  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-              std::vector<std::shared_ptr<ArrayData>>&& columns);
-
-  /// \brief Construct record batch by copying vector of array data
-  /// \since 0.5.0
-  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-              const std::vector<std::shared_ptr<ArrayData>>& columns);
-
-  /// \brief Determine if two record batches are exactly equal
-  /// \return true if batches are equal
-  bool Equals(const RecordBatch& other) const;
-
-  /// \brief Determine if two record batches are approximately equal
-  bool ApproxEquals(const RecordBatch& other) const;
-
-  // \return the table's schema
-  /// \return true if batches are equal
-  std::shared_ptr<Schema> schema() const { return schema_; }
-
-  /// \brief Retrieve an array from the record batch
-  /// \param[in] i field index, does not boundscheck
-  /// \return an Array object
-  std::shared_ptr<Array> column(int i) const;
-
-  std::shared_ptr<ArrayData> column_data(int i) const { return columns_[i]; }
-
-  /// \brief Name in i-th column
-  const std::string& column_name(int i) const;
-
-  /// \return the number of columns in the table
-  int num_columns() const { return static_cast<int>(columns_.size()); }
-
-  /// \return the number of rows (the corresponding length of each column)
-  int64_t num_rows() const { return num_rows_; }
-
-  /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
-  /// \since 0.5.0
-  ///
-  /// \param[in] metadata new KeyValueMetadata
-  /// \return new RecordBatch
-  std::shared_ptr<RecordBatch> ReplaceSchemaMetadata(
-      const std::shared_ptr<const KeyValueMetadata>& metadata) const;
-
-  /// \brief Slice each of the arrays in the record batch
-  /// \param[in] offset the starting offset to slice, through end of batch
-  /// \return new record batch
-  std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
-
-  /// \brief Slice each of the arrays in the record batch
-  /// \param[in] offset the starting offset to slice
-  /// \param[in] length the number of elements to slice from offset
-  /// \return new record batch
-  std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const;
-
-  /// \brief Check for schema or length inconsistencies
-  /// \return Status
-  Status Validate() const;
-
- private:
-  ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
-
-  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
-
-  std::shared_ptr<Schema> schema_;
-  int64_t num_rows_;
-  std::vector<std::shared_ptr<ArrayData>> columns_;
-
-  // Caching boxed array data
-  mutable std::vector<std::shared_ptr<Array>> boxed_columns_;
-};
-
 /// \class Table
 /// \brief Logical table as sequence of chunked arrays
 class ARROW_EXPORT Table {
  public:
+  virtual ~Table() = default;
+
   /// \brief Construct Table from schema and columns
   /// If columns is zero-length, the table's number of rows is zero
   /// \param schema The table schema (column types)
   /// \param columns The table's columns
   /// \param num_rows number of rows in table, -1 (default) to infer from 
columns
-  Table(const std::shared_ptr<Schema>& schema,
-        const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows 
= -1);
+  static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema,
+                                     const 
std::vector<std::shared_ptr<Column>>& columns,
+                                     int64_t num_rows = -1);
 
   /// \brief Construct Table from schema and arrays
   /// \param schema The table schema (column types)
   /// \param arrays The table's columns as arrays
   /// \param num_rows number of rows in table, -1 (default) to infer from 
columns
-  Table(const std::shared_ptr<Schema>& schema,
-        const std::vector<std::shared_ptr<Array>>& arrays, int64_t num_rows = 
-1);
+  static std::shared_ptr<Table> Make(const std::shared_ptr<Schema>& schema,
+                                     const 
std::vector<std::shared_ptr<Array>>& arrays,
+                                     int64_t num_rows = -1);
 
   // Construct table from RecordBatch, but only if all of the batch schemas are
   // equal. Returns Status::Invalid if there is some problem
@@ -242,25 +146,28 @@ class ARROW_EXPORT Table {
 
   /// \param[in] i column index, does not boundscheck
   /// \return the i-th column
-  std::shared_ptr<Column> column(int i) const { return columns_[i]; }
+  virtual std::shared_ptr<Column> column(int i) const = 0;
 
   /// \brief Remove column from the table, producing a new Table
-  Status RemoveColumn(int i, std::shared_ptr<Table>* out) const;
+  virtual Status RemoveColumn(int i, std::shared_ptr<Table>* out) const = 0;
 
   /// \brief Add column to the table, producing a new Table
-  Status AddColumn(int i, const std::shared_ptr<Column>& column,
-                   std::shared_ptr<Table>* out) const;
+  virtual Status AddColumn(int i, const std::shared_ptr<Column>& column,
+                           std::shared_ptr<Table>* out) const = 0;
 
   /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL)
   /// \since 0.5.0
   ///
   /// \param[in] metadata new KeyValueMetadata
   /// \return new Table
-  std::shared_ptr<Table> ReplaceSchemaMetadata(
-      const std::shared_ptr<const KeyValueMetadata>& metadata) const;
+  virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
+
+  /// \brief Perform any checks to validate the input arguments
+  virtual Status Validate() const = 0;
 
   /// \return the number of columns in the table
-  int num_columns() const { return static_cast<int>(columns_.size()); }
+  int num_columns() const { return schema_->num_fields(); }
 
   /// \return the number of rows (the corresponding length of each column)
   int64_t num_rows() const { return num_rows_; }
@@ -268,35 +175,14 @@ class ARROW_EXPORT Table {
   /// \brief Determine if semantic contents of tables are exactly equal
   bool Equals(const Table& other) const;
 
-  /// \brief Perform any checks to validate the input arguments
-  Status ValidateColumns() const;
-
-  /// \brief Return true if any column has multiple chunks
-  bool IsChunked() const;
-
- private:
-  ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
+ protected:
+  Table();
 
   std::shared_ptr<Schema> schema_;
-  std::vector<std::shared_ptr<Column>> columns_;
-
   int64_t num_rows_;
-};
-
-/// \brief Abstract interface for reading stream of record batches
-class ARROW_EXPORT RecordBatchReader {
- public:
-  virtual ~RecordBatchReader();
 
-  /// \return the shared schema of the record batches in the stream
-  virtual std::shared_ptr<Schema> schema() const = 0;
-
-  /// Read the next record batch in the stream. Return null for batch when
-  /// reaching end of stream
-  ///
-  /// \param[out] batch the next loaded batch, null at end of stream
-  /// \return Status
-  virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(Table);
 };
 
 /// \brief Compute a sequence of record batches from a (possibly chunked) Table
@@ -322,13 +208,18 @@ ARROW_EXPORT
 Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables,
                          std::shared_ptr<Table>* table);
 
+#ifndef ARROW_NO_DEPRECATED_API
+
 /// \brief Construct table from multiple input tables.
 /// \return Status, fails if any schemas are different
+/// \note Deprecated since 0.8.0
 ARROW_EXPORT
 Status MakeTable(const std::shared_ptr<Schema>& schema,
                  const std::vector<std::shared_ptr<Array>>& arrays,
                  std::shared_ptr<Table>* table);
 
+#endif
+
 }  // namespace arrow
 
 #endif  // ARROW_TABLE_H
diff --git a/cpp/src/arrow/table_builder-test.cc 
b/cpp/src/arrow/table_builder-test.cc
index 07d9b6b..8167577 100644
--- a/cpp/src/arrow/table_builder-test.cc
+++ b/cpp/src/arrow/table_builder-test.cc
@@ -22,6 +22,7 @@
 #include "gtest/gtest.h"
 
 #include "arrow/array.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/table_builder.h"
@@ -98,7 +99,7 @@ TEST_F(TestRecordBatchBuilder, Basics) {
   ASSERT_OK(ex_b1.Finish(&a1));
   ASSERT_OK(ex_b2.Finish(&a2));
 
-  RecordBatch expected(schema, 4, {a0, a1, a2});
+  auto expected = RecordBatch::Make(schema, 4, {a0, a1, a2});
 
   // Builder attributes
   ASSERT_EQ(3, builder->num_fields());
@@ -119,7 +120,7 @@ TEST_F(TestRecordBatchBuilder, Basics) {
       ASSERT_OK(builder->Flush(&batch));
     }
 
-    ASSERT_BATCHES_EQUAL(expected, *batch);
+    ASSERT_BATCHES_EQUAL(*expected, *batch);
   }
 
   // Test setting initial capacity
diff --git a/cpp/src/arrow/table_builder.cc b/cpp/src/arrow/table_builder.cc
index a1bd959..379d886 100644
--- a/cpp/src/arrow/table_builder.cc
+++ b/cpp/src/arrow/table_builder.cc
@@ -24,6 +24,7 @@
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
@@ -64,7 +65,7 @@ Status RecordBatchBuilder::Flush(bool reset_builders,
     }
     length = fields[i]->length();
   }
-  *batch = std::make_shared<RecordBatch>(schema_, length, std::move(fields));
+  *batch = RecordBatch::Make(schema_, length, std::move(fields));
   if (reset_builders) {
     return InitBuilders();
   } else {
diff --git a/cpp/src/arrow/test-common.h b/cpp/src/arrow/test-common.h
index a4c4fdd..911adf7 100644
--- a/cpp/src/arrow/test-common.h
+++ b/cpp/src/arrow/test-common.h
@@ -30,7 +30,6 @@
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/memory_pool.h"
-#include "arrow/table.h"
 #include "arrow/test-util.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 77f489a..1a34808 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -35,7 +35,6 @@
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
@@ -375,7 +374,7 @@ void AssertArraysEqual(const Array& expected, const Array& 
actual) {
 
 #define ASSERT_BATCHES_EQUAL(LEFT, RIGHT)    \
   do {                                       \
-    if (!LEFT.ApproxEquals(RIGHT)) {         \
+    if (!(LEFT).ApproxEquals(RIGHT)) {       \
       std::stringstream ss;                  \
       ss << "Left:\n";                       \
       ASSERT_OK(PrettyPrint(LEFT, 0, &ss));  \
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 70f275c..8dcc159 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -498,9 +498,9 @@ class ARROW_EXPORT StructType : public NestedType {
   std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
-class ARROW_EXPORT DecimalBaseType : public FixedSizeBinaryType {
+class ARROW_EXPORT DecimalType : public FixedSizeBinaryType {
  public:
-  explicit DecimalBaseType(int32_t byte_width, int32_t precision, int32_t 
scale)
+  explicit DecimalType(int32_t byte_width, int32_t precision, int32_t scale)
       : FixedSizeBinaryType(byte_width, Type::DECIMAL),
         precision_(precision),
         scale_(scale) {}
@@ -513,21 +513,18 @@ class ARROW_EXPORT DecimalBaseType : public 
FixedSizeBinaryType {
   int32_t scale_;
 };
 
-class ARROW_EXPORT Decimal128Type : public DecimalBaseType {
+class ARROW_EXPORT Decimal128Type : public DecimalType {
  public:
   static constexpr Type::type type_id = Type::DECIMAL;
 
   explicit Decimal128Type(int32_t precision, int32_t scale)
-      : DecimalBaseType(16, precision, scale) {}
+      : DecimalType(16, precision, scale) {}
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   std::string name() const override { return "decimal"; }
 };
 
-// TODO(wesm): Remove this
-using DecimalType = Decimal128Type;
-
 struct UnionMode {
   enum type { SPARSE, DENSE };
 };
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index dbfd89c..73e34c7 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -403,8 +403,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CChunkedArray] data()
 
     cdef cppclass CRecordBatch" arrow::RecordBatch":
-        CRecordBatch(const shared_ptr[CSchema]& schema, int64_t num_rows,
-                     const vector[shared_ptr[CArray]]& columns)
+        @staticmethod
+        shared_ptr[CRecordBatch] Make(
+            const shared_ptr[CSchema]& schema, int64_t num_rows,
+            const vector[shared_ptr[CArray]]& columns)
 
         c_bool Equals(const CRecordBatch& other)
 
@@ -428,6 +430,11 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
                const vector[shared_ptr[CColumn]]& columns)
 
         @staticmethod
+        shared_ptr[CTable] Make(
+            const shared_ptr[CSchema]& schema,
+            const vector[shared_ptr[CColumn]]& columns)
+
+        @staticmethod
         CStatus FromRecordBatches(
             const vector[shared_ptr[CRecordBatch]]& batches,
             shared_ptr[CTable]* table)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 591f329..8c5b8bb 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -724,7 +724,6 @@ cdef class RecordBatch:
             Array arr
             c_string c_name
             shared_ptr[CSchema] schema
-            shared_ptr[CRecordBatch] batch
             vector[shared_ptr[CArray]] c_arrays
             int64_t num_rows
             int64_t i
@@ -740,8 +739,8 @@ cdef class RecordBatch:
         for arr in arrays:
             c_arrays.push_back(arr.sp_array)
 
-        batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
-        return pyarrow_wrap_batch(batch)
+        return pyarrow_wrap_batch(
+            CRecordBatch.Make(schema, num_rows, c_arrays))
 
 
 def table_to_blocks(PandasOptions options, Table table, int nthreads,
@@ -946,8 +945,7 @@ cdef class Table:
             else:
                 raise ValueError(type(arrays[i]))
 
-        table.reset(new CTable(c_schema, columns))
-        return pyarrow_wrap_table(table)
+        return pyarrow_wrap_table(CTable.Make(c_schema, columns))
 
     @staticmethod
     def from_batches(batches):

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <commits@arrow.apache.org>'].

Reply via email to