This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7184c3f  ARROW-10570: [R] Use Converter API to convert SEXP to 
Array/ChunkedArray
7184c3f is described below

commit 7184c3f46981dd52c3c521b2676796e82f17da77
Author: Romain Francois <[email protected]>
AuthorDate: Mon Mar 1 16:02:56 2021 -0800

    ARROW-10570: [R] Use Converter API to convert SEXP to Array/ChunkedArray
    
    Closes #8650 from romainfrancois/RConverter
    
    Lead-authored-by: Romain Francois <[email protected]>
    Co-authored-by: Krisztián Szűcs <[email protected]>
    Signed-off-by: Neal Richardson <[email protected]>
---
 cpp/src/arrow/python/python_to_arrow.cc |   82 +-
 cpp/src/arrow/util/converter.h          |   38 +-
 r/R/array.R                             |    2 +-
 r/R/arrowExports.R                      |   32 +-
 r/src/array_from_vector.cpp             | 1599 -------------------------------
 r/src/array_to_vector.cpp               |    4 +-
 r/src/arrowExports.cpp                  |   80 +-
 r/src/arrow_cpp11.h                     |    1 +
 r/src/arrow_types.h                     |   19 +-
 r/src/chunkedarray.cpp                  |   42 +
 r/src/r_to_arrow.cpp                    | 1054 ++++++++++++++++++++
 r/src/recordbatch.cpp                   |    4 +-
 r/src/table.cpp                         |    2 +-
 r/src/type_infer.cpp                    |  202 ++++
 r/tests/testthat/test-Array.R           |   30 +-
 r/tests/testthat/test-chunked-array.R   |   24 +-
 16 files changed, 1475 insertions(+), 1740 deletions(-)

diff --git a/cpp/src/arrow/python/python_to_arrow.cc 
b/cpp/src/arrow/python/python_to_arrow.cc
index b136bec..b2d9f1c 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -388,36 +388,36 @@ class PyValue {
   }
 };
 
-template <typename T>
-Status Extend(T* converter, PyObject* values, int64_t size) {
-  /// Ensure we've allocated enough space
-  RETURN_NOT_OK(converter->Reserve(size));
-  // Iterate over the items adding each one
-  return internal::VisitSequence(values, [converter](PyObject* item, bool* /* 
unused */) {
-    return converter->Append(item);
-  });
-}
-
-// Convert and append a sequence of values masked with a numpy array
-template <typename T>
-Status ExtendMasked(T* converter, PyObject* values, PyObject* mask, int64_t 
size) {
-  /// Ensure we've allocated enough space
-  RETURN_NOT_OK(converter->Reserve(size));
-  // Iterate over the items adding each one
-  return internal::VisitSequenceMasked(
-      values, mask, [converter](PyObject* item, bool is_masked, bool* /* 
unused */) {
-        if (is_masked) {
-          return converter->AppendNull();
-        } else {
-          // This will also apply the null-checking convention in the event
-          // that the value is not masked
-          return converter->Append(item);  // perhaps use AppendValue instead?
-        }
-      });
-}
-
 // The base Converter class is a mixin with predefined behavior and 
constructors.
-using PyConverter = Converter<PyObject*, PyConversionOptions>;
+class PyConverter : public Converter<PyObject*, PyConversionOptions> {
+ public:
+  // Iterate over the input values and defer the conversion to the Append 
method
+  Status Extend(PyObject* values, int64_t size) override {
+    /// Ensure we've allocated enough space
+    RETURN_NOT_OK(this->Reserve(size));
+    // Iterate over the items adding each one
+    return internal::VisitSequence(values, [this](PyObject* item, bool* /* 
unused */) {
+      return this->Append(item);
+    });
+  }
+
+  // Convert and append a sequence of values masked with a numpy array
+  Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override 
{
+    /// Ensure we've allocated enough space
+    RETURN_NOT_OK(this->Reserve(size));
+    // Iterate over the items adding each one
+    return internal::VisitSequenceMasked(
+        values, mask, [this](PyObject* item, bool is_masked, bool* /* unused 
*/) {
+          if (is_masked) {
+            return this->AppendNull();
+          } else {
+            // This will also apply the null-checking convention in the event
+            // that the value is not masked
+            return this->Append(item);  // perhaps use AppendValue instead?
+          }
+        });
+  }
+};
 
 template <typename T, typename Enable = void>
 class PyPrimitiveConverter;
@@ -669,7 +669,7 @@ class PyListConverter : public ListConverter<T, 
PyConverter, PyConverterTrait> {
   Status AppendSequence(PyObject* value) {
     int64_t size = static_cast<int64_t>(PySequence_Size(value));
     RETURN_NOT_OK(this->list_builder_->ValidateOverflow(size));
-    return Extend(this->value_converter_.get(), value, size);
+    return this->value_converter_->Extend(value, size);
   }
 
   Status AppendNdarray(PyObject* value) {
@@ -684,12 +684,12 @@ class PyListConverter : public ListConverter<T, 
PyConverter, PyConverterTrait> {
     switch (value_type->id()) {
 // If the value type does not match the expected NumPy dtype, then fall through
 // to a slower PySequence-based path
-#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE)               \
-  case Type::TYPE_ID: {                                         \
-    if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) {       \
-      return Extend(this->value_converter_.get(), value, size); \
-    }                                                           \
-    return AppendNdarrayTyped<TYPE, NUMPY_TYPE>(ndarray);       \
+#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE)         \
+  case Type::TYPE_ID: {                                   \
+    if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \
+      return this->value_converter_->Extend(value, size); \
+    }                                                     \
+    return AppendNdarrayTyped<TYPE, NUMPY_TYPE>(ndarray); \
   }
       LIST_FAST_CASE(BOOL, BooleanType, NPY_BOOL)
       LIST_FAST_CASE(UINT8, UInt8Type, NPY_UINT8)
@@ -707,7 +707,7 @@ class PyListConverter : public ListConverter<T, 
PyConverter, PyConverterTrait> {
       LIST_FAST_CASE(DURATION, DurationType, NPY_TIMEDELTA)
 #undef LIST_FAST_CASE
       default: {
-        return Extend(this->value_converter_.get(), value, size);
+        return this->value_converter_->Extend(value, size);
       }
     }
   }
@@ -1041,18 +1041,18 @@ Result<std::shared_ptr<ChunkedArray>> 
ConvertPySequence(PyObject* obj, PyObject*
     // the overflow and automatically creates new chunks.
     ARROW_ASSIGN_OR_RAISE(auto chunked_converter, 
MakeChunker(std::move(converter)));
     if (mask != nullptr && mask != Py_None) {
-      RETURN_NOT_OK(ExtendMasked(chunked_converter.get(), seq, mask, size));
+      RETURN_NOT_OK(chunked_converter->ExtendMasked(seq, mask, size));
     } else {
-      RETURN_NOT_OK(Extend(chunked_converter.get(), seq, size));
+      RETURN_NOT_OK(chunked_converter->Extend(seq, size));
     }
     return chunked_converter->ToChunkedArray();
   } else {
     // If the converter can't overflow spare the capacity error checking on 
the hot-path,
     // this improves the performance roughly by ~10% for primitive types.
     if (mask != nullptr && mask != Py_None) {
-      RETURN_NOT_OK(ExtendMasked(converter.get(), seq, mask, size));
+      RETURN_NOT_OK(converter->ExtendMasked(seq, mask, size));
     } else {
-      RETURN_NOT_OK(Extend(converter.get(), seq, size));
+      RETURN_NOT_OK(converter->Extend(seq, size));
     }
     return converter->ToChunkedArray();
   }
diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h
index e18f6e3..2c40a48 100644
--- a/cpp/src/arrow/util/converter.h
+++ b/cpp/src/arrow/util/converter.h
@@ -52,7 +52,15 @@ class Converter {
     return Init(pool);
   }
 
-  virtual Status Append(InputType value) = 0;
+  virtual Status Append(InputType value) { return 
Status::NotImplemented("Append"); }
+
+  virtual Status Extend(InputType values, int64_t size) {
+    return Status::NotImplemented("Extend");
+  }
+
+  virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) {
+    return Status::NotImplemented("ExtendMasked");
+  }
 
   const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }
 
@@ -294,6 +302,34 @@ class Chunker {
     return status;
   }
 
+  // we could get bit smarter here since the whole batch of appendable values
+  // will be rejected if a capacity error is raised
+  Status Extend(InputType values, int64_t size) {
+    auto status = converter_->Extend(values, size);
+    if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
+      if (converter_->builder()->length() == 0) {
+        return status;
+      }
+      ARROW_RETURN_NOT_OK(FinishChunk());
+      return Extend(values, size);
+    }
+    length_ += size;
+    return status;
+  }
+
+  Status ExtendMasked(InputType values, InputType mask, int64_t size) {
+    auto status = converter_->ExtendMasked(values, mask, size);
+    if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
+      if (converter_->builder()->length() == 0) {
+        return status;
+      }
+      ARROW_RETURN_NOT_OK(FinishChunk());
+      return ExtendMasked(values, mask, size);
+    }
+    length_ += size;
+    return status;
+  }
+
   Status FinishChunk() {
     ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
     chunks_.push_back(chunk);
diff --git a/r/R/array.R b/r/R/array.R
index acb612b..ed0e898 100644
--- a/r/R/array.R
+++ b/r/R/array.R
@@ -150,7 +150,7 @@ Array$create <- function(x, type = NULL) {
   if (!is.null(type)) {
     type <- as_type(type)
   }
-  Array__from_vector(x, type)
+  vec_to_arrow(x, type)
 }
 
 #' @rdname array
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 790232c..4f4bf1f 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -136,22 +136,6 @@ LargeListArray__raw_value_offsets <- function(array){
     .Call(`_arrow_LargeListArray__raw_value_offsets`, array)
 }
 
-Array__infer_type <- function(x){
-    .Call(`_arrow_Array__infer_type`, x)
-}
-
-Array__from_vector <- function(x, s_type){
-    .Call(`_arrow_Array__from_vector`, x, s_type)
-}
-
-ChunkedArray__from_list <- function(chunks, s_type){
-    .Call(`_arrow_ChunkedArray__from_list`, chunks, s_type)
-}
-
-DictionaryArray__FromArrays <- function(type, indices, dict){
-    .Call(`_arrow_DictionaryArray__FromArrays`, type, indices, dict)
-}
-
 Array__as_vector <- function(array){
     .Call(`_arrow_Array__as_vector`, array)
 }
@@ -264,6 +248,10 @@ ChunkedArray__ToString <- function(x){
     .Call(`_arrow_ChunkedArray__ToString`, x)
 }
 
+ChunkedArray__from_list <- function(chunks, s_type){
+    .Call(`_arrow_ChunkedArray__from_list`, chunks, s_type)
+}
+
 util___Codec__Create <- function(codec, compression_level){
     .Call(`_arrow_util___Codec__Create`, codec, compression_level)
 }
@@ -1280,6 +1268,14 @@ ExportRecordBatch <- function(batch, array_ptr, 
schema_ptr){
     invisible(.Call(`_arrow_ExportRecordBatch`, batch, array_ptr, schema_ptr))
 }
 
+vec_to_arrow <- function(x, s_type){
+    .Call(`_arrow_vec_to_arrow`, x, s_type)
+}
+
+DictionaryArray__FromArrays <- function(type, indices, dict){
+    .Call(`_arrow_DictionaryArray__FromArrays`, type, indices, dict)
+}
+
 RecordBatch__num_columns <- function(x){
     .Call(`_arrow_RecordBatch__num_columns`, x)
 }
@@ -1600,5 +1596,9 @@ SetCpuThreadPoolCapacity <- function(threads){
     invisible(.Call(`_arrow_SetCpuThreadPoolCapacity`, threads))
 }
 
+Array__infer_type <- function(x){
+    .Call(`_arrow_Array__infer_type`, x)
+}
+
 
 
diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp
deleted file mode 100644
index c2d2286..0000000
--- a/r/src/array_from_vector.cpp
+++ /dev/null
@@ -1,1599 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <memory>
-
-#include "./arrow_types.h"
-#include "./arrow_vctrs.h"
-
-#if defined(ARROW_R_WITH_ARROW)
-#include <arrow/array/array_base.h>
-#include <arrow/builder.h>
-#include <arrow/chunked_array.h>
-#include <arrow/util/bitmap_writer.h>
-#include <arrow/visitor_inline.h>
-
-using arrow::internal::checked_cast;
-
-namespace arrow {
-namespace r {
-
-template <typename T>
-inline bool is_na(T value) {
-  return false;
-}
-
-template <>
-inline bool is_na<int64_t>(int64_t value) {
-  return value == NA_INT64;
-}
-
-template <>
-inline bool is_na<double>(double value) {
-  return ISNA(value);
-}
-
-template <>
-inline bool is_na<int>(int value) {
-  return value == NA_INTEGER;
-}
-
-struct VectorToArrayConverter {
-  Status Visit(const arrow::NullType& type) {
-    auto* null_builder = checked_cast<NullBuilder*>(builder);
-    return null_builder->AppendNulls(XLENGTH(x));
-  }
-
-  Status Visit(const arrow::BooleanType& type) {
-    ARROW_RETURN_IF(TYPEOF(x) != LGLSXP, Status::RError("Expecting a logical 
vector"));
-    R_xlen_t n = XLENGTH(x);
-
-    auto* bool_builder = checked_cast<BooleanBuilder*>(builder);
-    auto* p = LOGICAL(x);
-
-    RETURN_NOT_OK(bool_builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      auto value = p[i];
-      if (value == NA_LOGICAL) {
-        bool_builder->UnsafeAppendNull();
-      } else {
-        bool_builder->UnsafeAppend(value == 1);
-      }
-    }
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::Int32Type& type) {
-    ARROW_RETURN_IF(TYPEOF(x) != INTSXP, Status::RError("Expecting an integer 
vector"));
-
-    auto* int_builder = checked_cast<Int32Builder*>(builder);
-
-    R_xlen_t n = XLENGTH(x);
-    const auto* data = INTEGER(x);
-
-    RETURN_NOT_OK(int_builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      const auto value = data[i];
-      if (value == NA_INTEGER) {
-        int_builder->UnsafeAppendNull();
-      } else {
-        int_builder->UnsafeAppend(value);
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::Int64Type& type) {
-    ARROW_RETURN_IF(TYPEOF(x) != REALSXP, Status::RError("Expecting a numeric 
vector"));
-    ARROW_RETURN_IF(Rf_inherits(x, "integer64"),
-                    Status::RError("Expecting a vector that inherits 
integer64"));
-
-    auto* int_builder = checked_cast<Int64Builder*>(builder);
-
-    R_xlen_t n = XLENGTH(x);
-    const auto* data = (REAL(x));
-
-    RETURN_NOT_OK(int_builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      const auto value = arrow::util::SafeCopy<int64_t>(data[i]);
-      if (value == NA_INT64) {
-        int_builder->UnsafeAppendNull();
-      } else {
-        int_builder->UnsafeAppend(value);
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::DoubleType& type) {
-    ARROW_RETURN_IF(TYPEOF(x) != REALSXP, Status::RError("Expecting a numeric 
vector"));
-
-    auto* double_builder = checked_cast<DoubleBuilder*>(builder);
-
-    R_xlen_t n = XLENGTH(x);
-    const auto* data = (REAL(x));
-
-    RETURN_NOT_OK(double_builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      const auto value = data[i];
-      if (ISNA(value)) {
-        double_builder->UnsafeAppendNull();
-      } else {
-        double_builder->UnsafeAppend(value);
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::BinaryType& type) {
-    if (!(Rf_inherits(x, "vctrs_list_of") &&
-          TYPEOF(Rf_getAttrib(x, symbols::ptype)) == RAWSXP)) {
-      return Status::RError("Expecting a list of raw vectors");
-    }
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::FixedSizeBinaryType& type) {
-    if (!(Rf_inherits(x, "vctrs_list_of") &&
-          TYPEOF(Rf_getAttrib(x, symbols::ptype)) == RAWSXP)) {
-      return Status::RError("Expecting a list of raw vectors");
-    }
-
-    return Status::OK();
-  }
-
-  template <typename T>
-  arrow::enable_if_base_binary<T, Status> Visit(const T& type) {
-    using BuilderType = typename TypeTraits<T>::BuilderType;
-
-    ARROW_RETURN_IF(TYPEOF(x) != STRSXP, Status::RError("Expecting a character 
vector"));
-
-    auto* binary_builder = checked_cast<BuilderType*>(builder);
-
-    R_xlen_t n = XLENGTH(x);
-    RETURN_NOT_OK(builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP si = STRING_ELT(x, i);
-      if (si == NA_STRING) {
-        RETURN_NOT_OK(binary_builder->AppendNull());
-        continue;
-      }
-      std::string s = cpp11::r_string(si);
-      RETURN_NOT_OK(binary_builder->Append(s.c_str(), s.size()));
-    }
-
-    return Status::OK();
-  }
-
-  template <typename T>
-  arrow::enable_if_base_list<T, Status> Visit(const T& type) {
-    using BuilderType = typename TypeTraits<T>::BuilderType;
-
-    ARROW_RETURN_IF(TYPEOF(x) != VECSXP, Status::RError("Expecting a list 
vector"));
-
-    auto* list_builder = checked_cast<BuilderType*>(builder);
-    auto* value_builder = list_builder->value_builder();
-    auto value_type = type.value_type();
-
-    R_xlen_t n = XLENGTH(x);
-    RETURN_NOT_OK(builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP vector = VECTOR_ELT(x, i);
-      if (Rf_isNull(vector)) {
-        RETURN_NOT_OK(list_builder->AppendNull());
-        continue;
-      }
-
-      RETURN_NOT_OK(list_builder->Append());
-
-      // Recurse.
-      VectorToArrayConverter converter{vector, value_builder};
-      Status status = arrow::VisitTypeInline(*value_type, &converter);
-      if (!status.ok()) {
-        return Status::RError("Cannot convert list element ", (i + 1),
-                              " to an Array of type `", value_type->ToString(),
-                              "` : ", status.message());
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status Visit(const FixedSizeListType& type) {
-    ARROW_RETURN_IF(TYPEOF(x) != VECSXP, Status::RError("Expecting a list 
vector"));
-
-    auto* fixed_size_list_builder = 
checked_cast<FixedSizeListBuilder*>(builder);
-    auto* value_builder = fixed_size_list_builder->value_builder();
-    auto value_type = type.value_type();
-    int list_size = type.list_size();
-
-    R_xlen_t n = XLENGTH(x);
-    RETURN_NOT_OK(builder->Reserve(n));
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP vector = VECTOR_ELT(x, i);
-      if (Rf_isNull(vector)) {
-        RETURN_NOT_OK(fixed_size_list_builder->AppendNull());
-        continue;
-      }
-      RETURN_NOT_OK(fixed_size_list_builder->Append());
-
-      auto vect_type = arrow::r::InferArrowType(vector);
-      if (!value_type->Equals(vect_type)) {
-        return Status::RError("FixedSizeList vector expecting elements vector 
of type ",
-                              value_type->ToString(), " but got ", 
vect_type->ToString());
-      }
-      int vector_size = vctrs::short_vec_size(vector);
-      if (vector_size != list_size) {
-        return Status::RError("FixedSizeList vector expecting elements vector 
of size ",
-                              list_size, ", not ", vector_size);
-      }
-
-      // Recurse.
-      VectorToArrayConverter converter{vector, value_builder};
-      RETURN_NOT_OK(arrow::VisitTypeInline(*value_type, &converter));
-    }
-
-    return Status::OK();
-  }
-
-  template <typename T>
-  arrow::enable_if_t<is_struct_type<T>::value, Status> Visit(const T& type) {
-    using BuilderType = typename TypeTraits<T>::BuilderType;
-    ARROW_RETURN_IF(!Rf_inherits(x, "data.frame"),
-                    Status::RError("Expecting a data frame"));
-
-    auto* struct_builder = checked_cast<BuilderType*>(builder);
-
-    int64_t n = vctrs::short_vec_size(x);
-    RETURN_NOT_OK(struct_builder->Reserve(n));
-    RETURN_NOT_OK(struct_builder->AppendValues(n, NULLPTR));
-
-    int num_fields = struct_builder->num_fields();
-
-    // Visit each column of the data frame using the associated
-    // field builder
-    for (R_xlen_t i = 0; i < num_fields; i++) {
-      auto column_builder = struct_builder->field_builder(i);
-      SEXP x_i = VECTOR_ELT(x, i);
-      int64_t n_i = vctrs::short_vec_size(x_i);
-      if (n_i != n) {
-        SEXP name_i = STRING_ELT(Rf_getAttrib(x, R_NamesSymbol), i);
-        return Status::RError("Degenerated data frame. Column '", CHAR(name_i),
-                              "' has size ", n_i, " instead of the number of 
rows: ", n);
-      }
-
-      VectorToArrayConverter converter{x_i, column_builder};
-      RETURN_NOT_OK(arrow::VisitTypeInline(*column_builder->type().get(), 
&converter));
-    }
-
-    return Status::OK();
-  }
-
-  template <typename T>
-  arrow::enable_if_t<std::is_same<DictionaryType, T>::value, Status> Visit(
-      const T& type) {
-    // TODO: perhaps this replaces MakeFactorArrayImpl ?
-
-    ARROW_RETURN_IF(!Rf_isFactor(x), Status::RError("Expecting a factor"));
-    int64_t n = vctrs::short_vec_size(x);
-
-    auto* dict_builder = checked_cast<StringDictionaryBuilder*>(builder);
-    RETURN_NOT_OK(dict_builder->Reserve(n));
-
-    SEXP levels = Rf_getAttrib(x, R_LevelsSymbol);
-    auto memo = VectorToArrayConverter::Visit(levels, utf8());
-    RETURN_NOT_OK(dict_builder->InsertMemoValues(*memo));
-
-    int* p_values = INTEGER(x);
-    for (int64_t i = 0; i < n; i++, ++p_values) {
-      int v = *p_values;
-      if (v == NA_INTEGER) {
-        RETURN_NOT_OK(dict_builder->AppendNull());
-      } else {
-        RETURN_NOT_OK(dict_builder->Append(CHAR(STRING_ELT(levels, v - 1))));
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status Visit(const arrow::DataType& type) {
-    return Status::NotImplemented("Converting vector to arrow type ", 
type.ToString(),
-                                  " not implemented");
-  }
-
-  static std::shared_ptr<Array> Visit(SEXP x, const std::shared_ptr<DataType>& 
type) {
-    std::unique_ptr<ArrayBuilder> builder;
-    StopIfNotOk(MakeBuilder(gc_memory_pool(), type, &builder));
-
-    VectorToArrayConverter converter{x, builder.get()};
-    StopIfNotOk(arrow::VisitTypeInline(*type, &converter));
-
-    std::shared_ptr<Array> result;
-    StopIfNotOk(builder->Finish(&result));
-    return result;
-  }
-
-  SEXP x;
-  arrow::ArrayBuilder* builder;
-};
-
-template <typename Type>
-std::shared_ptr<Array> MakeFactorArrayImpl(cpp11::integers factor,
-                                           const 
std::shared_ptr<arrow::DataType>& type) {
-  using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
-  auto n = factor.size();
-
-  std::shared_ptr<Buffer> indices_buffer =
-      ValueOrStop(AllocateBuffer(n * sizeof(value_type), gc_memory_pool()));
-
-  std::vector<std::shared_ptr<Buffer>> buffers{nullptr, indices_buffer};
-
-  int64_t null_count = 0;
-  R_xlen_t i = 0;
-  auto p_factor = factor.begin();
-  auto p_indices = 
reinterpret_cast<value_type*>(indices_buffer->mutable_data());
-  for (; i < n; i++, ++p_indices, ++p_factor) {
-    if (*p_factor == NA_INTEGER) break;
-    *p_indices = *p_factor - 1;
-  }
-
-  if (i < n) {
-    // there are NA's so we need a null buffer
-    auto null_buffer =
-        ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), 
gc_memory_pool()));
-    internal::FirstTimeBitmapWriter 
null_bitmap_writer(null_buffer->mutable_data(), 0, n);
-
-    // catch up
-    for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) {
-      null_bitmap_writer.Set();
-    }
-
-    // resume offset filling
-    for (; i < n; i++, ++p_indices, ++p_factor, null_bitmap_writer.Next()) {
-      if (*p_factor == NA_INTEGER) {
-        null_bitmap_writer.Clear();
-        null_count++;
-      } else {
-        null_bitmap_writer.Set();
-        *p_indices = *p_factor - 1;
-      }
-    }
-
-    null_bitmap_writer.Finish();
-    buffers[0] = std::move(null_buffer);
-  }
-
-  auto array_indices_data =
-      ArrayData::Make(std::make_shared<Type>(), n, std::move(buffers), 
null_count, 0);
-  auto array_indices = MakeArray(array_indices_data);
-
-  SEXP levels = Rf_getAttrib(factor, R_LevelsSymbol);
-  auto dict = VectorToArrayConverter::Visit(levels, utf8());
-
-  return ValueOrStop(DictionaryArray::FromArrays(type, array_indices, dict));
-}
-
-std::shared_ptr<Array> MakeFactorArray(cpp11::integers factor,
-                                       const std::shared_ptr<arrow::DataType>& 
type) {
-  const auto& dict_type = checked_cast<const arrow::DictionaryType&>(*type);
-  switch (dict_type.index_type()->id()) {
-    case Type::INT8:
-      return MakeFactorArrayImpl<arrow::Int8Type>(factor, type);
-    case Type::INT16:
-      return MakeFactorArrayImpl<arrow::Int16Type>(factor, type);
-    case Type::INT32:
-      return MakeFactorArrayImpl<arrow::Int32Type>(factor, type);
-    case Type::INT64:
-      return MakeFactorArrayImpl<arrow::Int64Type>(factor, type);
-    default:
-      break;
-  }
-
-  cpp11::stop("Cannot convert to dictionary with index_type '%s'",
-              dict_type.index_type()->ToString().c_str());
-}
-
-std::shared_ptr<Array> MakeStructArray(SEXP df, const 
std::shared_ptr<DataType>& type) {
-  int n = type->num_fields();
-  std::vector<std::shared_ptr<Array>> children(n);
-  for (int i = 0; i < n; i++) {
-    children[i] = Array__from_vector(VECTOR_ELT(df, i), 
type->field(i)->type(), true);
-  }
-
-  int64_t rows = n ? children[0]->length() : 0;
-  return std::make_shared<StructArray>(type, rows, children);
-}
-
-template <typename T>
-int64_t time_cast(T value);
-
-template <>
-inline int64_t time_cast<int>(int value) {
-  return static_cast<int64_t>(value) * 1000;
-}
-
-template <>
-inline int64_t time_cast<double>(double value) {
-  return static_cast<int64_t>(value * 1000);
-}
-
-}  // namespace r
-}  // namespace arrow
-
-// ---------------- new api
-
-namespace arrow {
-
-namespace internal {
-
-template <typename T, typename Target,
-          typename std::enable_if<std::is_signed<Target>::value, Target>::type 
= 0>
-Status int_cast(T x, Target* out) {
-  if (static_cast<int64_t>(x) < std::numeric_limits<Target>::min() ||
-      static_cast<int64_t>(x) > std::numeric_limits<Target>::max()) {
-    return Status::Invalid("Value is too large to fit in C integer type");
-  }
-  *out = static_cast<Target>(x);
-  return Status::OK();
-}
-
-template <typename T>
-struct usigned_type;
-
-template <typename T, typename Target,
-          typename std::enable_if<std::is_unsigned<Target>::value, 
Target>::type = 0>
-Status int_cast(T x, Target* out) {
-  // we need to compare between unsigned integers
-  uint64_t x64 = x;
-  if (x64 < 0 || x64 > std::numeric_limits<Target>::max()) {
-    return Status::Invalid("Value is too large to fit in C integer type");
-  }
-  *out = static_cast<Target>(x);
-  return Status::OK();
-}
-
-template <typename Int>
-Status double_cast(Int x, double* out) {
-  *out = static_cast<double>(x);
-  return Status::OK();
-}
-
-template <>
-Status double_cast<int64_t>(int64_t x, double* out) {
-  constexpr int64_t kDoubleMax = 1LL << 53;
-  constexpr int64_t kDoubleMin = -(1LL << 53);
-
-  if (x < kDoubleMin || x > kDoubleMax) {
-    return Status::Invalid("integer value ", x, " is outside of the range 
exactly",
-                           " representable by a IEEE 754 double precision 
value");
-  }
-  *out = static_cast<double>(x);
-  return Status::OK();
-}
-
-// used for int and int64_t
-template <typename T>
-Status float_cast(T x, float* out) {
-  constexpr int64_t kHalfFloatMax = 1LL << 24;
-  constexpr int64_t kHalfFloatMin = -(1LL << 24);
-
-  int64_t x64 = static_cast<int64_t>(x);
-  if (x64 < kHalfFloatMin || x64 > kHalfFloatMax) {
-    return Status::Invalid("integer value ", x, " is outside of the range 
exactly",
-                           " representable by a IEEE 754 half precision 
value");
-  }
-
-  *out = static_cast<float>(x);
-  return Status::OK();
-}
-
-template <>
-Status float_cast<double>(double x, float* out) {
-  // TODO: is there some sort of floating point overflow ?
-  *out = static_cast<float>(x);
-  return Status::OK();
-}
-
-}  // namespace internal
-
-namespace r {
-
-class VectorConverter;
-
-Status GetConverter(const std::shared_ptr<DataType>& type,
-                    std::unique_ptr<VectorConverter>* out);
-
-class VectorConverter {
- public:
-  virtual ~VectorConverter() = default;
-
-  virtual Status Init(ArrayBuilder* builder) = 0;
-
-  virtual Status Ingest(SEXP obj) = 0;
-
-  virtual Status GetResult(std::shared_ptr<arrow::Array>* result) {
-    return builder_->Finish(result);
-  }
-
-  ArrayBuilder* builder() const { return builder_; }
-
- protected:
-  ArrayBuilder* builder_;
-};
-
-class NullVectorConverter : public VectorConverter {
- public:
-  using BuilderType = NullBuilder;
-
-  ~NullVectorConverter() {}
-
-  Status Init(ArrayBuilder* builder) override {
-    builder_ = builder;
-    typed_builder_ = checked_cast<BuilderType*>(builder_);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) override {
-    RETURN_NOT_OK(typed_builder_->AppendNulls(XLENGTH(obj)));
-    return Status::OK();
-  }
-
- protected:
-  BuilderType* typed_builder_;
-};
-
-template <typename Type, typename Enable = void>
-struct Unbox {};
-
-// unboxer for int type
-template <typename Type>
-struct Unbox<Type, enable_if_integer<Type>> {
-  using BuilderType = typename TypeTraits<Type>::BuilderType;
-  using ArrayType = typename TypeTraits<Type>::ArrayType;
-  using CType = typename ArrayType::value_type;
-
-  static inline Status Ingest(BuilderType* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      case INTSXP:
-        return IngestRange<int>(builder, INTEGER(obj), XLENGTH(obj));
-      case REALSXP:
-        if (Rf_inherits(obj, "integer64")) {
-          return IngestRange<int64_t>(builder, 
reinterpret_cast<int64_t*>(REAL(obj)),
-                                      XLENGTH(obj));
-        }
-        return IngestRange(builder, REAL(obj), XLENGTH(obj));
-
-      // TODO: handle raw and logical
-      default:
-        break;
-    }
-
-    return Status::Invalid("Cannot convert R vector of type <", 
Rf_type2char(TYPEOF(obj)),
-                           "> to integer Arrow array");
-  }
-
-  template <typename T>
-  static inline Status IngestRange(BuilderType* builder, T* p, R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (is_na<T>(*p)) {
-        builder->UnsafeAppendNull();
-      } else {
-        CType value = 0;
-        RETURN_NOT_OK(internal::int_cast(*p, &value));
-        builder->UnsafeAppend(value);
-      }
-    }
-    return Status::OK();
-  }
-};
-
-template <>
-struct Unbox<DoubleType> {
-  static inline Status Ingest(DoubleBuilder* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      // TODO: handle RAW
-      case INTSXP:
-        return IngestIntRange<int>(builder, INTEGER(obj), XLENGTH(obj), 
NA_INTEGER);
-      case REALSXP:
-        if (Rf_inherits(obj, "integer64")) {
-          return IngestIntRange<int64_t>(builder, 
reinterpret_cast<int64_t*>(REAL(obj)),
-                                         XLENGTH(obj), NA_INT64);
-        }
-        return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj));
-    }
-    return Status::Invalid("Cannot convert R object to double type");
-  }
-
-  template <typename T>
-  static inline Status IngestIntRange(DoubleBuilder* builder, T* p, R_xlen_t 
n, T na) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (*p == NA_INTEGER) {
-        builder->UnsafeAppendNull();
-      } else {
-        double value = 0;
-        RETURN_NOT_OK(internal::double_cast(*p, &value));
-        builder->UnsafeAppend(value);
-      }
-    }
-    return Status::OK();
-  }
-
-  static inline Status IngestDoubleRange(DoubleBuilder* builder, double* p, 
R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (ISNA(*p)) {
-        builder->UnsafeAppendNull();
-      } else {
-        builder->UnsafeAppend(*p);
-      }
-    }
-    return Status::OK();
-  }
-};
-
-template <>
-struct Unbox<FloatType> {
-  static inline Status Ingest(FloatBuilder* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      // TODO: handle RAW
-      case INTSXP:
-        return IngestIntRange<int>(builder, INTEGER(obj), XLENGTH(obj), 
NA_INTEGER);
-      case REALSXP:
-        if (Rf_inherits(obj, "integer64")) {
-          return IngestIntRange<int64_t>(builder, 
reinterpret_cast<int64_t*>(REAL(obj)),
-                                         XLENGTH(obj), NA_INT64);
-        }
-        return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj));
-    }
-    return Status::Invalid("Cannot convert R object to double type");
-  }
-
-  template <typename T>
-  static inline Status IngestIntRange(FloatBuilder* builder, T* p, R_xlen_t n, 
T na) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (*p == NA_INTEGER) {
-        builder->UnsafeAppendNull();
-      } else {
-        float value = 0;
-        RETURN_NOT_OK(internal::float_cast(*p, &value));
-        builder->UnsafeAppend(value);
-      }
-    }
-    return Status::OK();
-  }
-
-  static inline Status IngestDoubleRange(FloatBuilder* builder, double* p, 
R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (ISNA(*p)) {
-        builder->UnsafeAppendNull();
-      } else {
-        float value;
-        RETURN_NOT_OK(internal::float_cast(*p, &value));
-        builder->UnsafeAppend(value);
-      }
-    }
-    return Status::OK();
-  }
-};
-
-template <>
-struct Unbox<BooleanType> {
-  static inline Status Ingest(BooleanBuilder* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      case LGLSXP: {
-        R_xlen_t n = XLENGTH(obj);
-        RETURN_NOT_OK(builder->Resize(n));
-        int* p = LOGICAL(obj);
-        for (R_xlen_t i = 0; i < n; i++, ++p) {
-          if (*p == NA_LOGICAL) {
-            builder->UnsafeAppendNull();
-          } else {
-            builder->UnsafeAppend(*p == 1);
-          }
-        }
-        return Status::OK();
-      }
-
-      default:
-        break;
-    }
-
-    // TODO: include more information about the R object and the target type
-    return Status::Invalid("Cannot convert R object to boolean type");
-  }
-};
-
-template <>
-struct Unbox<Date32Type> {
-  static inline Status Ingest(Date32Builder* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      case INTSXP:
-        if (Rf_inherits(obj, "Date")) {
-          return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj));
-        }
-        break;
-      case REALSXP:
-        if (Rf_inherits(obj, "Date")) {
-          return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj));
-        }
-        break;
-      default:
-        break;
-    }
-    return Status::Invalid("Cannot convert R object to date32 type");
-  }
-
-  static inline Status IngestIntRange(Date32Builder* builder, int* p, R_xlen_t 
n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (*p == NA_INTEGER) {
-        builder->UnsafeAppendNull();
-      } else {
-        builder->UnsafeAppend(*p);
-      }
-    }
-    return Status::OK();
-  }
-
-  static inline Status IngestDoubleRange(Date32Builder* builder, double* p, 
R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (ISNA(*p)) {
-        builder->UnsafeAppendNull();
-      } else {
-        builder->UnsafeAppend(static_cast<int>(*p));
-      }
-    }
-    return Status::OK();
-  }
-};
-
-template <>
-struct Unbox<Date64Type> {
-  constexpr static int64_t kMillisecondsPerDay = 86400000;
-
-  static inline Status Ingest(Date64Builder* builder, SEXP obj) {
-    switch (TYPEOF(obj)) {
-      case INTSXP:
-        // number of days since epoch
-        if (Rf_inherits(obj, "Date")) {
-          return IngestDateInt32Range(builder, INTEGER(obj), XLENGTH(obj));
-        }
-        break;
-
-      case REALSXP:
-        // (fractional number of days since epoch)
-        if (Rf_inherits(obj, "Date")) {
-          return IngestDateDoubleRange<kMillisecondsPerDay>(builder, REAL(obj),
-                                                            XLENGTH(obj));
-        }
-
-        // number of seconds since epoch
-        if (Rf_inherits(obj, "POSIXct")) {
-          return IngestDateDoubleRange<1000>(builder, REAL(obj), XLENGTH(obj));
-        }
-    }
-    return Status::Invalid("Cannot convert R object to date64 type");
-  }
-
-  // ingest a integer vector that represents number of days since epoch
-  static inline Status IngestDateInt32Range(Date64Builder* builder, int* p, 
R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (*p == NA_INTEGER) {
-        builder->UnsafeAppendNull();
-      } else {
-        builder->UnsafeAppend(*p * kMillisecondsPerDay);
-      }
-    }
-    return Status::OK();
-  }
-
-  // ingest a numeric vector that represents (fractional) number of days since 
epoch
-  template <int64_t MULTIPLIER>
-  static inline Status IngestDateDoubleRange(Date64Builder* builder, double* p,
-                                             R_xlen_t n) {
-    RETURN_NOT_OK(builder->Resize(n));
-
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (ISNA(*p)) {
-        builder->UnsafeAppendNull();
-      } else {
-        builder->UnsafeAppend(static_cast<int64_t>(*p * MULTIPLIER));
-      }
-    }
-    return Status::OK();
-  }
-};
-
-template <typename Type, class Derived>
-class TypedVectorConverter : public VectorConverter {
- public:
-  using BuilderType = typename TypeTraits<Type>::BuilderType;
-
-  Status Init(ArrayBuilder* builder) override {
-    builder_ = builder;
-    typed_builder_ = checked_cast<BuilderType*>(builder_);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) override { return 
Unbox<Type>::Ingest(typed_builder_, obj); }
-
- protected:
-  BuilderType* typed_builder_;
-};
-
-template <typename Type>
-class NumericVectorConverter
-    : public TypedVectorConverter<Type, NumericVectorConverter<Type>> {};
-
-class BooleanVectorConverter
-    : public TypedVectorConverter<BooleanType, BooleanVectorConverter> {};
-
-class Date32Converter : public TypedVectorConverter<Date32Type, 
Date32Converter> {};
-class Date64Converter : public TypedVectorConverter<Date64Type, 
Date64Converter> {};
-
-inline int64_t get_time_multiplier(TimeUnit::type unit) {
-  switch (unit) {
-    case TimeUnit::SECOND:
-      return 1;
-    case TimeUnit::MILLI:
-      return 1000;
-    case TimeUnit::MICRO:
-      return 1000000;
-    case TimeUnit::NANO:
-      return 1000000000;
-    default:
-      return 0;
-  }
-}
-
-template <typename Type>
-class TimeConverter : public VectorConverter {
-  using BuilderType = typename TypeTraits<Type>::BuilderType;
-
- public:
-  explicit TimeConverter(TimeUnit::type unit)
-      : unit_(unit), multiplier_(get_time_multiplier(unit)) {}
-
-  Status Init(ArrayBuilder* builder) override {
-    builder_ = builder;
-    typed_builder_ = checked_cast<BuilderType*>(builder);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) override {
-    if (valid_R_object(obj)) {
-      int difftime_multiplier;
-      RETURN_NOT_OK(GetDifftimeMultiplier(obj, &difftime_multiplier));
-      return Ingest_POSIXct(REAL(obj), XLENGTH(obj), difftime_multiplier);
-    }
-
-    return Status::Invalid("Cannot convert R object to timestamp type");
-  }
-
- protected:
-  TimeUnit::type unit_;
-  BuilderType* typed_builder_;
-  int64_t multiplier_;
-
-  Status Ingest_POSIXct(double* p, R_xlen_t n, int difftime_multiplier) {
-    RETURN_NOT_OK(typed_builder_->Resize(n));
-
-    for (R_xlen_t i = 0; i < n; i++, ++p) {
-      if (ISNA(*p)) {
-        typed_builder_->UnsafeAppendNull();
-      } else {
-        typed_builder_->UnsafeAppend(
-            static_cast<int64_t>(*p * multiplier_ * difftime_multiplier));
-      }
-    }
-    return Status::OK();
-  }
-
-  virtual bool valid_R_object(SEXP obj) = 0;
-
-  // only used for Time32 and Time64
-  virtual Status GetDifftimeMultiplier(SEXP obj, int* res) {
-    std::string unit(CHAR(STRING_ELT(Rf_getAttrib(obj, symbols::units), 0)));
-    if (unit == "secs") {
-      *res = 1;
-    } else if (unit == "mins") {
-      *res = 60;
-    } else if (unit == "hours") {
-      *res = 3600;
-    } else if (unit == "days") {
-      *res = 86400;
-    } else if (unit == "weeks") {
-      *res = 604800;
-    } else {
-      return Status::Invalid("unknown difftime unit");
-    }
-    return Status::OK();
-  }
-};
-
-class TimestampConverter : public TimeConverter<TimestampType> {
- public:
-  explicit TimestampConverter(TimeUnit::type unit) : 
TimeConverter<TimestampType>(unit) {}
-
- protected:
-  bool valid_R_object(SEXP obj) override {
-    return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "POSIXct");
-  }
-
-  Status GetDifftimeMultiplier(SEXP obj, int* res) override {
-    *res = 1;
-    return Status::OK();
-  }
-};
-
-class Time32Converter : public TimeConverter<Time32Type> {
- public:
-  explicit Time32Converter(TimeUnit::type unit) : 
TimeConverter<Time32Type>(unit) {}
-
- protected:
-  bool valid_R_object(SEXP obj) override {
-    return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime");
-  }
-};
-
-class Time64Converter : public TimeConverter<Time64Type> {
- public:
-  explicit Time64Converter(TimeUnit::type unit) : 
TimeConverter<Time64Type>(unit) {}
-
- protected:
-  bool valid_R_object(SEXP obj) override {
-    return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime");
-  }
-};
-
-template <typename Builder>
-class BinaryVectorConverter : public VectorConverter {
- public:
-  ~BinaryVectorConverter() {}
-
-  Status Init(ArrayBuilder* builder) {
-    typed_builder_ = checked_cast<Builder*>(builder);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) {
-    ARROW_RETURN_IF(TYPEOF(obj) != VECSXP, Status::RError("Expecting a list"));
-    R_xlen_t n = XLENGTH(obj);
-
-    // Reserve enough space before appending
-    int64_t size = 0;
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP obj_i = VECTOR_ELT(obj, i);
-      if (!Rf_isNull(obj_i)) {
-        ARROW_RETURN_IF(TYPEOF(obj_i) != RAWSXP,
-                        Status::RError("Expecting a raw vector"));
-        size += XLENGTH(obj_i);
-      }
-    }
-    RETURN_NOT_OK(typed_builder_->Reserve(size));
-
-    // append
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP obj_i = VECTOR_ELT(obj, i);
-      if (Rf_isNull(obj_i)) {
-        RETURN_NOT_OK(typed_builder_->AppendNull());
-      } else {
-        RETURN_NOT_OK(typed_builder_->Append(RAW(obj_i), XLENGTH(obj_i)));
-      }
-    }
-    return Status::OK();
-  }
-
-  Status GetResult(std::shared_ptr<arrow::Array>* result) {
-    return typed_builder_->Finish(result);
-  }
-
- private:
-  Builder* typed_builder_;
-};
-
-class FixedSizeBinaryVectorConverter : public VectorConverter {
- public:
-  ~FixedSizeBinaryVectorConverter() {}
-
-  Status Init(ArrayBuilder* builder) {
-    typed_builder_ = checked_cast<FixedSizeBinaryBuilder*>(builder);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) {
-    ARROW_RETURN_IF(TYPEOF(obj) != VECSXP, Status::RError("Expecting a list"));
-    R_xlen_t n = XLENGTH(obj);
-
-    // Reserve enough space before appending
-    int32_t byte_width = typed_builder_->byte_width();
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP obj_i = VECTOR_ELT(obj, i);
-      if (!Rf_isNull(obj_i)) {
-        ARROW_RETURN_IF(TYPEOF(obj_i) != RAWSXP,
-                        Status::RError("Expecting a raw vector"));
-        ARROW_RETURN_IF(XLENGTH(obj_i) != byte_width,
-                        Status::RError("Expecting a raw vector of ", 
byte_width,
-                                       " bytes, not ", XLENGTH(obj_i)));
-      }
-    }
-    RETURN_NOT_OK(typed_builder_->Reserve(n * byte_width));
-
-    // append
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP obj_i = VECTOR_ELT(obj, i);
-      if (Rf_isNull(obj_i)) {
-        RETURN_NOT_OK(typed_builder_->AppendNull());
-      } else {
-        RETURN_NOT_OK(typed_builder_->Append(RAW(obj_i)));
-      }
-    }
-    return Status::OK();
-  }
-
-  Status GetResult(std::shared_ptr<arrow::Array>* result) {
-    return typed_builder_->Finish(result);
-  }
-
- private:
-  FixedSizeBinaryBuilder* typed_builder_;
-};
-
-template <typename StringBuilder>
-class StringVectorConverter : public VectorConverter {
- public:
-  ~StringVectorConverter() {}
-
-  Status Init(ArrayBuilder* builder) {
-    typed_builder_ = checked_cast<StringBuilder*>(builder);
-    return Status::OK();
-  }
-
-  Status Ingest(SEXP obj) {
-    ARROW_RETURN_IF(TYPEOF(obj) != STRSXP,
-                    Status::RError("Expecting a character vector"));
-
-    cpp11::strings s(arrow::r::utf8_strings(obj));
-    RETURN_NOT_OK(typed_builder_->Reserve(s.size()));
-
-    // we know all the R strings are utf8 already, so we can get
-    // a definite size and then use UnsafeAppend*()
-    int64_t total_length = 0;
-    for (cpp11::r_string si : s) {
-      total_length += cpp11::is_na(si) ? 0 : si.size();
-    }
-    RETURN_NOT_OK(typed_builder_->ReserveData(total_length));
-
-    // append
-    for (cpp11::r_string si : s) {
-      if (si == NA_STRING) {
-        typed_builder_->UnsafeAppendNull();
-      } else {
-        typed_builder_->UnsafeAppend(CHAR(si), si.size());
-      }
-    }
-
-    return Status::OK();
-  }
-
-  Status GetResult(std::shared_ptr<arrow::Array>* result) {
-    return typed_builder_->Finish(result);
-  }
-
- private:
-  StringBuilder* typed_builder_;
-};
-
-#define NUMERIC_CONVERTER(TYPE_ENUM, TYPE)                                     
          \
-  case Type::TYPE_ENUM:                                                        
          \
-    *out =                                                                     
          \
-        std::unique_ptr<NumericVectorConverter<TYPE>>(new 
NumericVectorConverter<TYPE>); \
-    return Status::OK()
-
-#define SIMPLE_CONVERTER_CASE(TYPE_ENUM, TYPE) \
-  case Type::TYPE_ENUM:                        \
-    *out = std::unique_ptr<TYPE>(new TYPE);    \
-    return Status::OK()
-
-#define TIME_CONVERTER_CASE(TYPE_ENUM, DATA_TYPE, TYPE)                        
        \
-  case Type::TYPE_ENUM:                                                        
        \
-    *out =                                                                     
        \
-        std::unique_ptr<TYPE>(new 
TYPE(checked_cast<DATA_TYPE*>(type.get())->unit())); \
-    return Status::OK()
-
-Status GetConverter(const std::shared_ptr<DataType>& type,
-                    std::unique_ptr<VectorConverter>* out) {
-  switch (type->id()) {
-    SIMPLE_CONVERTER_CASE(BINARY, BinaryVectorConverter<arrow::BinaryBuilder>);
-    SIMPLE_CONVERTER_CASE(LARGE_BINARY, 
BinaryVectorConverter<arrow::LargeBinaryBuilder>);
-    SIMPLE_CONVERTER_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryVectorConverter);
-    SIMPLE_CONVERTER_CASE(BOOL, BooleanVectorConverter);
-    SIMPLE_CONVERTER_CASE(STRING, StringVectorConverter<arrow::StringBuilder>);
-    SIMPLE_CONVERTER_CASE(LARGE_STRING, 
StringVectorConverter<arrow::LargeStringBuilder>);
-    NUMERIC_CONVERTER(INT8, Int8Type);
-    NUMERIC_CONVERTER(INT16, Int16Type);
-    NUMERIC_CONVERTER(INT32, Int32Type);
-    NUMERIC_CONVERTER(INT64, Int64Type);
-    NUMERIC_CONVERTER(UINT8, UInt8Type);
-    NUMERIC_CONVERTER(UINT16, UInt16Type);
-    NUMERIC_CONVERTER(UINT32, UInt32Type);
-    NUMERIC_CONVERTER(UINT64, UInt64Type);
-
-    // TODO: not sure how to handle half floats
-    //       the python code uses npy_half
-    // NUMERIC_CONVERTER(HALF_FLOAT, HalfFloatType);
-    NUMERIC_CONVERTER(FLOAT, FloatType);
-    NUMERIC_CONVERTER(DOUBLE, DoubleType);
-
-    SIMPLE_CONVERTER_CASE(DATE32, Date32Converter);
-    SIMPLE_CONVERTER_CASE(DATE64, Date64Converter);
-
-    // TODO: probably after we merge ARROW-3628
-    // case Type::DECIMAL:
-
-    TIME_CONVERTER_CASE(TIME32, Time32Type, Time32Converter);
-    TIME_CONVERTER_CASE(TIME64, Time64Type, Time64Converter);
-    TIME_CONVERTER_CASE(TIMESTAMP, TimestampType, TimestampConverter);
-
-    case Type::NA:
-      *out = std::unique_ptr<NullVectorConverter>(new NullVectorConverter);
-      return Status::OK();
-
-    default:
-      break;
-  }
-  return Status::NotImplemented("type not implemented");
-}
-
-static inline std::shared_ptr<arrow::DataType> IndexTypeForFactors(int 
n_factors) {
-  if (n_factors < INT8_MAX) {
-    return arrow::int8();
-  } else if (n_factors < INT16_MAX) {
-    return arrow::int16();
-  } else {
-    return arrow::int32();
-  }
-}
-
-std::shared_ptr<arrow::DataType> InferArrowTypeFromFactor(SEXP factor) {
-  SEXP factors = Rf_getAttrib(factor, R_LevelsSymbol);
-  auto index_type = IndexTypeForFactors(Rf_length(factors));
-  bool is_ordered = Rf_inherits(factor, "ordered");
-  return dictionary(index_type, arrow::utf8(), is_ordered);
-}
-
-template <int VectorType>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector(SEXP x) {
-  cpp11::stop("Unknown vector type: ", VectorType);
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<ENVSXP>(SEXP x) {
-  if (Rf_inherits(x, "Array")) {
-    return cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x)->type();
-  }
-
-  cpp11::stop("Unrecognized vector instance for type ENVSXP");
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<LGLSXP>(SEXP x) {
-  return Rf_inherits(x, "vctrs_unspecified") ? null() : boolean();
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<INTSXP>(SEXP x) {
-  if (Rf_isFactor(x)) {
-    return InferArrowTypeFromFactor(x);
-  } else if (Rf_inherits(x, "Date")) {
-    return date32();
-  } else if (Rf_inherits(x, "POSIXct")) {
-    auto tzone_sexp = Rf_getAttrib(x, symbols::tzone);
-    if (Rf_isNull(tzone_sexp)) {
-      return timestamp(TimeUnit::MICRO);
-    } else {
-      return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0)));
-    }
-  }
-  return int32();
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<REALSXP>(SEXP x) {
-  if (Rf_inherits(x, "Date")) {
-    return date32();
-  }
-  if (Rf_inherits(x, "POSIXct")) {
-    auto tzone_sexp = Rf_getAttrib(x, symbols::tzone);
-    if (Rf_isNull(tzone_sexp)) {
-      return timestamp(TimeUnit::MICRO);
-    } else {
-      return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0)));
-    }
-  }
-  if (Rf_inherits(x, "integer64")) {
-    return int64();
-  }
-  if (Rf_inherits(x, "difftime")) {
-    return time32(TimeUnit::SECOND);
-  }
-  return float64();
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<STRSXP>(SEXP x) {
-  return cpp11::unwind_protect([&] {
-    R_xlen_t n = XLENGTH(x);
-
-    int64_t size = 0;
-
-    for (R_xlen_t i = 0; i < n; i++) {
-      size += arrow::r::unsafe::r_string_size(STRING_ELT(x, i));
-      if (size > arrow::kBinaryMemoryLimit) {
-        // Exceeds 2GB capacity of utf8 type, so use large
-        return large_utf8();
-      }
-    }
-
-    return utf8();
-  });
-}
-
-static inline std::shared_ptr<arrow::DataType> InferArrowTypeFromDataFrame(
-    cpp11::list x) {
-  R_xlen_t n = x.size();
-  cpp11::strings names(x.attr(R_NamesSymbol));
-  std::vector<std::shared_ptr<arrow::Field>> fields(n);
-  for (R_xlen_t i = 0; i < n; i++) {
-    fields[i] = arrow::field(names[i], InferArrowType(x[i]));
-  }
-  return arrow::struct_(std::move(fields));
-}
-
-template <>
-std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<VECSXP>(SEXP x) {
-  if (Rf_inherits(x, "data.frame") || Rf_inherits(x, "POSIXlt")) {
-    return InferArrowTypeFromDataFrame(x);
-  } else {
-    // some known special cases
-    if (Rf_inherits(x, "arrow_fixed_size_binary")) {
-      SEXP byte_width = Rf_getAttrib(x, symbols::byte_width);
-      if (Rf_isNull(byte_width) || TYPEOF(byte_width) != INTSXP ||
-          XLENGTH(byte_width) != 1) {
-        cpp11::stop("malformed arrow_fixed_size_binary object");
-      }
-      return arrow::fixed_size_binary(INTEGER(byte_width)[0]);
-    }
-
-    if (Rf_inherits(x, "arrow_binary")) {
-      return arrow::binary();
-    }
-
-    if (Rf_inherits(x, "arrow_large_binary")) {
-      return arrow::large_binary();
-    }
-
-    SEXP ptype = Rf_getAttrib(x, symbols::ptype);
-    if (Rf_isNull(ptype)) {
-      if (XLENGTH(x) == 0) {
-        cpp11::stop(
-            "Requires at least one element to infer the values' type of a list 
vector");
-      }
-
-      ptype = VECTOR_ELT(x, 0);
-    }
-
-    return arrow::list(InferArrowType(ptype));
-  }
-}
-
-std::shared_ptr<arrow::DataType> InferArrowType(SEXP x) {
-  switch (TYPEOF(x)) {
-    case ENVSXP:
-      return InferArrowTypeFromVector<ENVSXP>(x);
-    case LGLSXP:
-      return InferArrowTypeFromVector<LGLSXP>(x);
-    case INTSXP:
-      return InferArrowTypeFromVector<INTSXP>(x);
-    case REALSXP:
-      return InferArrowTypeFromVector<REALSXP>(x);
-    case RAWSXP:
-      return int8();
-    case STRSXP:
-      return InferArrowTypeFromVector<STRSXP>(x);
-    case VECSXP:
-      return InferArrowTypeFromVector<VECSXP>(x);
-    default:
-      break;
-  }
-
-  cpp11::stop("Cannot infer type from vector");
-}
-
-// in some situations we can just use the memory of the R object in an RBuffer
-// instead of going through ArrayBuilder, etc ...
-bool can_reuse_memory(SEXP x, const std::shared_ptr<arrow::DataType>& type) {
-  switch (type->id()) {
-    case Type::INT32:
-      return TYPEOF(x) == INTSXP && !OBJECT(x);
-    case Type::DOUBLE:
-      return TYPEOF(x) == REALSXP && !OBJECT(x);
-    case Type::INT8:
-      return TYPEOF(x) == RAWSXP && !OBJECT(x);
-    case Type::INT64:
-      return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64");
-    default:
-      break;
-  }
-  return false;
-}
-
-// this is only used on some special cases when the arrow Array can just use 
the memory of
-// the R object, via an RBuffer, hence be zero copy
-template <int RTYPE, typename RVector, typename Type>
-std::shared_ptr<Array> MakeSimpleArray(SEXP x) {
-  using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
-  RVector vec(x);
-  auto n = vec.size();
-  auto p_vec_start = reinterpret_cast<value_type*>(DATAPTR(vec));
-  auto p_vec_end = p_vec_start + n;
-  std::vector<std::shared_ptr<Buffer>> buffers{nullptr,
-                                               
std::make_shared<RBuffer<RVector>>(vec)};
-
-  int null_count = 0;
-
-  auto first_na = std::find_if(p_vec_start, p_vec_end, is_na<value_type>);
-  if (first_na < p_vec_end) {
-    auto null_bitmap =
-        ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), 
gc_memory_pool()));
-    internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 
0, n);
-
-    // first loop to clear all the bits before the first NA
-    auto j = std::distance(p_vec_start, first_na);
-    int i = 0;
-    for (; i < j; i++, bitmap_writer.Next()) {
-      bitmap_writer.Set();
-    }
-
-    auto p_vec = first_na;
-    // then finish
-    for (; i < n; i++, bitmap_writer.Next(), ++p_vec) {
-      if (is_na<value_type>(*p_vec)) {
-        bitmap_writer.Clear();
-        null_count++;
-      } else {
-        bitmap_writer.Set();
-      }
-    }
-
-    bitmap_writer.Finish();
-    buffers[0] = std::move(null_bitmap);
-  }
-
-  auto data = ArrayData::Make(std::make_shared<Type>(), LENGTH(x), 
std::move(buffers),
-                              null_count, 0 /*offset*/);
-
-  // return the right Array class
-  return std::make_shared<typename TypeTraits<Type>::ArrayType>(data);
-}
-
-std::shared_ptr<arrow::Array> Array__from_vector_reuse_memory(SEXP x) {
-  auto type = TYPEOF(x);
-
-  if (type == INTSXP) {
-    return MakeSimpleArray<INTSXP, cpp11::integers, Int32Type>(x);
-  } else if (type == REALSXP && Rf_inherits(x, "integer64")) {
-    return MakeSimpleArray<REALSXP, cpp11::doubles, Int64Type>(x);
-  } else if (type == REALSXP) {
-    return MakeSimpleArray<REALSXP, cpp11::doubles, DoubleType>(x);
-  } else if (type == RAWSXP) {
-    return MakeSimpleArray<RAWSXP, cpp11::raws, UInt8Type>(x);
-  }
-
-  cpp11::stop("Unreachable: you might need to fix can_reuse_memory()");
-}
-
-bool CheckCompatibleFactor(SEXP obj, const std::shared_ptr<arrow::DataType>& 
type) {
-  if (!Rf_inherits(obj, "factor")) {
-    return false;
-  }
-
-  const auto& dict_type = checked_cast<const arrow::DictionaryType&>(*type);
-  return dict_type.value_type()->Equals(utf8());
-}
-
-arrow::Status CheckCompatibleStruct(SEXP obj,
-                                    const std::shared_ptr<arrow::DataType>& 
type) {
-  if (!Rf_inherits(obj, "data.frame")) {
-    return Status::RError("Conversion to struct arrays requires a data.frame");
-  }
-
-  // check the number of columns
-  int num_fields = type->num_fields();
-  if (XLENGTH(obj) != num_fields) {
-    return Status::RError("Number of fields in struct (", num_fields,
-                          ") incompatible with number of columns in the data 
frame (",
-                          XLENGTH(obj), ")");
-  }
-
-  // check the names of each column
-  //
-  // the columns themselves are not checked against the
-  // types of the fields, because Array__from_vector will error
-  // when not compatible.
-  cpp11::strings names = Rf_getAttrib(obj, R_NamesSymbol);
-
-  return cpp11::unwind_protect([&] {
-    for (int i = 0; i < num_fields; i++) {
-      const char* name_i = arrow::r::unsafe::utf8_string(names[i]);
-      auto field_name = type->field(i)->name();
-      if (field_name != name_i) {
-        return Status::RError(
-            "Field name in position ", i, " (", field_name,
-            ") does not match the name of the column of the data frame (", 
name_i, ")");
-      }
-    }
-
-    return Status::OK();
-  });
-}
-
-std::shared_ptr<arrow::Array> Array__from_vector(
-    SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_inferred) {
-  // short circuit if `x` is already an Array
-  if (Rf_inherits(x, "Array")) {
-    return cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x);
-  }
-
-  // special case when we can just use the data from the R vector
-  // directly. This still needs to handle the null bitmap
-  if (arrow::r::can_reuse_memory(x, type)) {
-    return arrow::r::Array__from_vector_reuse_memory(x);
-  }
-
-  // factors only when type has been inferred
-  if (type->id() == Type::DICTIONARY) {
-    if (type_inferred || arrow::r::CheckCompatibleFactor(x, type)) {
-      // TODO: use VectorToArrayConverter instead, but it does not appear to 
work
-      // correctly with ordered dictionary yet
-      //
-      // return VectorToArrayConverter::Visit(x, type);
-      return arrow::r::MakeFactorArray(x, type);
-    }
-
-    cpp11::stop("Object incompatible with dictionary type");
-  }
-
-  if (type->id() == Type::LIST || type->id() == Type::LARGE_LIST ||
-      type->id() == Type::FIXED_SIZE_LIST) {
-    return VectorToArrayConverter::Visit(x, type);
-  }
-
-  // struct types
-  if (type->id() == Type::STRUCT) {
-    if (!type_inferred) {
-      StopIfNotOk(arrow::r::CheckCompatibleStruct(x, type));
-    }
-    // TODO: when the type has been infered, we could go through
-    //       VectorToArrayConverter:
-    //
-    // else {
-    //   return VectorToArrayConverter::Visit(df, type);
-    // }
-
-    return arrow::r::MakeStructArray(x, type);
-  }
-
-  // general conversion with converter and builder
-  std::unique_ptr<arrow::r::VectorConverter> converter;
-  StopIfNotOk(arrow::r::GetConverter(type, &converter));
-
-  // Create ArrayBuilder for type
-  std::unique_ptr<arrow::ArrayBuilder> type_builder;
-  StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder));
-  StopIfNotOk(converter->Init(type_builder.get()));
-
-  // ingest R data and grab the result array
-  StopIfNotOk(converter->Ingest(x));
-  std::shared_ptr<arrow::Array> result;
-  StopIfNotOk(converter->GetResult(&result));
-  return result;
-}
-
-}  // namespace r
-}  // namespace arrow
-
-// [[arrow::export]]
-std::shared_ptr<arrow::DataType> Array__infer_type(SEXP x) {
-  return arrow::r::InferArrowType(x);
-}
-
-// [[arrow::export]]
-std::shared_ptr<arrow::Array> Array__from_vector(SEXP x, SEXP s_type) {
-  // the type might be NULL, in which case we need to infer it from the data
-  // we keep track of whether it was inferred or supplied
-  bool type_inferred = Rf_isNull(s_type);
-  std::shared_ptr<arrow::DataType> type;
-  if (type_inferred) {
-    type = arrow::r::InferArrowType(x);
-  } else {
-    type = cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(s_type);
-  }
-
-  return arrow::r::Array__from_vector(x, type, type_inferred);
-}
-
-// [[arrow::export]]
-std::shared_ptr<arrow::ChunkedArray> ChunkedArray__from_list(cpp11::list 
chunks,
-                                                             SEXP s_type) {
-  std::vector<std::shared_ptr<arrow::Array>> vec;
-
-  // the type might be NULL, in which case we need to infer it from the data
-  // we keep track of whether it was inferred or supplied
-  bool type_inferred = Rf_isNull(s_type);
-  R_xlen_t n = XLENGTH(chunks);
-
-  std::shared_ptr<arrow::DataType> type;
-  if (type_inferred) {
-    if (n == 0) {
-      cpp11::stop("type must be specified for empty list");
-    }
-    type = arrow::r::InferArrowType(VECTOR_ELT(chunks, 0));
-  } else {
-    type = cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(s_type);
-  }
-
-  if (n == 0) {
-    std::shared_ptr<arrow::Array> array;
-    std::unique_ptr<arrow::ArrayBuilder> type_builder;
-    StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder));
-    StopIfNotOk(type_builder->Finish(&array));
-    vec.push_back(array);
-  } else {
-    // the first - might differ from the rest of the loop
-    // because we might have inferred the type from the first element of the 
list
-    //
-    // this only really matters for dictionary arrays
-    vec.push_back(arrow::r::Array__from_vector(chunks[0], type, 
type_inferred));
-
-    for (R_xlen_t i = 1; i < n; i++) {
-      vec.push_back(arrow::r::Array__from_vector(chunks[i], type, false));
-    }
-  }
-
-  return std::make_shared<arrow::ChunkedArray>(std::move(vec));
-}
-
-// [[arrow::export]]
-std::shared_ptr<arrow::Array> DictionaryArray__FromArrays(
-    const std::shared_ptr<arrow::DataType>& type,
-    const std::shared_ptr<arrow::Array>& indices,
-    const std::shared_ptr<arrow::Array>& dict) {
-  return ValueOrStop(arrow::DictionaryArray::FromArrays(type, indices, dict));
-}
-
-#endif
diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp
index c9a1f6c..ddcb749 100644
--- a/r/src/array_to_vector.cpp
+++ b/r/src/array_to_vector.cpp
@@ -18,8 +18,6 @@
 #include "./arrow_types.h"
 #if defined(ARROW_R_WITH_ARROW)
 
-#include <type_traits>
-
 #include <arrow/array.h>
 #include <arrow/builder.h>
 #include <arrow/datum.h>
@@ -30,6 +28,8 @@
 #include <arrow/util/parallel.h>
 #include <arrow/util/task_group.h>
 
+#include <type_traits>
+
 namespace arrow {
 
 using internal::checked_cast;
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 73ee648..78677e9 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -298,42 +298,6 @@ BEGIN_CPP11
        return cpp11::as_sexp(LargeListArray__raw_value_offsets(array));
 END_CPP11
 }
-// array_from_vector.cpp
-std::shared_ptr<arrow::DataType> Array__infer_type(SEXP x);
-extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){
-BEGIN_CPP11
-       arrow::r::Input<SEXP>::type x(x_sexp);
-       return cpp11::as_sexp(Array__infer_type(x));
-END_CPP11
-}
-// array_from_vector.cpp
-std::shared_ptr<arrow::Array> Array__from_vector(SEXP x, SEXP s_type);
-extern "C" SEXP _arrow_Array__from_vector(SEXP x_sexp, SEXP s_type_sexp){
-BEGIN_CPP11
-       arrow::r::Input<SEXP>::type x(x_sexp);
-       arrow::r::Input<SEXP>::type s_type(s_type_sexp);
-       return cpp11::as_sexp(Array__from_vector(x, s_type));
-END_CPP11
-}
-// array_from_vector.cpp
-std::shared_ptr<arrow::ChunkedArray> ChunkedArray__from_list(cpp11::list 
chunks, SEXP s_type);
-extern "C" SEXP _arrow_ChunkedArray__from_list(SEXP chunks_sexp, SEXP 
s_type_sexp){
-BEGIN_CPP11
-       arrow::r::Input<cpp11::list>::type chunks(chunks_sexp);
-       arrow::r::Input<SEXP>::type s_type(s_type_sexp);
-       return cpp11::as_sexp(ChunkedArray__from_list(chunks, s_type));
-END_CPP11
-}
-// array_from_vector.cpp
-std::shared_ptr<arrow::Array> DictionaryArray__FromArrays(const 
std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Array>& 
indices, const std::shared_ptr<arrow::Array>& dict);
-extern "C" SEXP _arrow_DictionaryArray__FromArrays(SEXP type_sexp, SEXP 
indices_sexp, SEXP dict_sexp){
-BEGIN_CPP11
-       arrow::r::Input<const std::shared_ptr<arrow::DataType>&>::type 
type(type_sexp);
-       arrow::r::Input<const std::shared_ptr<arrow::Array>&>::type 
indices(indices_sexp);
-       arrow::r::Input<const std::shared_ptr<arrow::Array>&>::type 
dict(dict_sexp);
-       return cpp11::as_sexp(DictionaryArray__FromArrays(type, indices, dict));
-END_CPP11
-}
 // array_to_vector.cpp
 SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
 extern "C" SEXP _arrow_Array__as_vector(SEXP array_sexp){
@@ -569,6 +533,15 @@ BEGIN_CPP11
        return cpp11::as_sexp(ChunkedArray__ToString(x));
 END_CPP11
 }
+// chunkedarray.cpp
+std::shared_ptr<arrow::ChunkedArray> ChunkedArray__from_list(cpp11::list 
chunks, SEXP s_type);
+extern "C" SEXP _arrow_ChunkedArray__from_list(SEXP chunks_sexp, SEXP 
s_type_sexp){
+BEGIN_CPP11
+       arrow::r::Input<cpp11::list>::type chunks(chunks_sexp);
+       arrow::r::Input<SEXP>::type s_type(s_type_sexp);
+       return cpp11::as_sexp(ChunkedArray__from_list(chunks, s_type));
+END_CPP11
+}
 // compression.cpp
 std::shared_ptr<arrow::util::Codec> 
util___Codec__Create(arrow::Compression::type codec, R_xlen_t 
compression_level);
 extern "C" SEXP _arrow_util___Codec__Create(SEXP codec_sexp, SEXP 
compression_level_sexp){
@@ -2788,6 +2761,25 @@ BEGIN_CPP11
        return R_NilValue;
 END_CPP11
 }
+// r_to_arrow.cpp
+SEXP vec_to_arrow(SEXP x, SEXP s_type);
+extern "C" SEXP _arrow_vec_to_arrow(SEXP x_sexp, SEXP s_type_sexp){
+BEGIN_CPP11
+       arrow::r::Input<SEXP>::type x(x_sexp);
+       arrow::r::Input<SEXP>::type s_type(s_type_sexp);
+       return cpp11::as_sexp(vec_to_arrow(x, s_type));
+END_CPP11
+}
+// r_to_arrow.cpp
+std::shared_ptr<arrow::Array> DictionaryArray__FromArrays(const 
std::shared_ptr<arrow::DataType>& type, const std::shared_ptr<arrow::Array>& 
indices, const std::shared_ptr<arrow::Array>& dict);
+extern "C" SEXP _arrow_DictionaryArray__FromArrays(SEXP type_sexp, SEXP 
indices_sexp, SEXP dict_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<arrow::DataType>&>::type 
type(type_sexp);
+       arrow::r::Input<const std::shared_ptr<arrow::Array>&>::type 
indices(indices_sexp);
+       arrow::r::Input<const std::shared_ptr<arrow::Array>&>::type 
dict(dict_sexp);
+       return cpp11::as_sexp(DictionaryArray__FromArrays(type, indices, dict));
+END_CPP11
+}
 // recordbatch.cpp
 int RecordBatch__num_columns(const std::shared_ptr<arrow::RecordBatch>& x);
 extern "C" SEXP _arrow_RecordBatch__num_columns(SEXP x_sexp){
@@ -3488,6 +3480,14 @@ BEGIN_CPP11
        return R_NilValue;
 END_CPP11
 }
+// type_infer.cpp
+std::shared_ptr<arrow::DataType> Array__infer_type(SEXP x);
+extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){
+BEGIN_CPP11
+       arrow::r::Input<SEXP>::type x(x_sexp);
+       return cpp11::as_sexp(Array__infer_type(x));
+END_CPP11
+}
 extern "C" SEXP _arrow_Table__Reset(SEXP r6) {
 BEGIN_CPP11
 arrow::r::r6_reset_pointer<arrow::Table>(r6);
@@ -3558,10 +3558,6 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_FixedSizeListArray__value_offset", (DL_FUNC) 
&_arrow_FixedSizeListArray__value_offset, 2}, 
                { "_arrow_ListArray__raw_value_offsets", (DL_FUNC) 
&_arrow_ListArray__raw_value_offsets, 1}, 
                { "_arrow_LargeListArray__raw_value_offsets", (DL_FUNC) 
&_arrow_LargeListArray__raw_value_offsets, 1}, 
-               { "_arrow_Array__infer_type", (DL_FUNC) 
&_arrow_Array__infer_type, 1}, 
-               { "_arrow_Array__from_vector", (DL_FUNC) 
&_arrow_Array__from_vector, 2}, 
-               { "_arrow_ChunkedArray__from_list", (DL_FUNC) 
&_arrow_ChunkedArray__from_list, 2}, 
-               { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) 
&_arrow_DictionaryArray__FromArrays, 3}, 
                { "_arrow_Array__as_vector", (DL_FUNC) 
&_arrow_Array__as_vector, 1}, 
                { "_arrow_ChunkedArray__as_vector", (DL_FUNC) 
&_arrow_ChunkedArray__as_vector, 1}, 
                { "_arrow_RecordBatch__to_dataframe", (DL_FUNC) 
&_arrow_RecordBatch__to_dataframe, 2}, 
@@ -3590,6 +3586,7 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_ChunkedArray__Validate", (DL_FUNC) 
&_arrow_ChunkedArray__Validate, 1}, 
                { "_arrow_ChunkedArray__Equals", (DL_FUNC) 
&_arrow_ChunkedArray__Equals, 2}, 
                { "_arrow_ChunkedArray__ToString", (DL_FUNC) 
&_arrow_ChunkedArray__ToString, 1}, 
+               { "_arrow_ChunkedArray__from_list", (DL_FUNC) 
&_arrow_ChunkedArray__from_list, 2}, 
                { "_arrow_util___Codec__Create", (DL_FUNC) 
&_arrow_util___Codec__Create, 2}, 
                { "_arrow_util___Codec__name", (DL_FUNC) 
&_arrow_util___Codec__name, 1}, 
                { "_arrow_util___Codec__IsAvailable", (DL_FUNC) 
&_arrow_util___Codec__IsAvailable, 1}, 
@@ -3844,6 +3841,8 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_ExportSchema", (DL_FUNC) &_arrow_ExportSchema, 2}, 
                { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, 
                { "_arrow_ExportRecordBatch", (DL_FUNC) 
&_arrow_ExportRecordBatch, 3}, 
+               { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, 
+               { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) 
&_arrow_DictionaryArray__FromArrays, 3}, 
                { "_arrow_RecordBatch__num_columns", (DL_FUNC) 
&_arrow_RecordBatch__num_columns, 1}, 
                { "_arrow_RecordBatch__num_rows", (DL_FUNC) 
&_arrow_RecordBatch__num_rows, 1}, 
                { "_arrow_RecordBatch__schema", (DL_FUNC) 
&_arrow_RecordBatch__schema, 1}, 
@@ -3924,6 +3923,7 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_Table__from_dots", (DL_FUNC) 
&_arrow_Table__from_dots, 2}, 
                { "_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) 
&_arrow_GetCpuThreadPoolCapacity, 0}, 
                { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) 
&_arrow_SetCpuThreadPoolCapacity, 1}, 
+               { "_arrow_Array__infer_type", (DL_FUNC) 
&_arrow_Array__infer_type, 1}, 
                { "_arrow_Table__Reset", (DL_FUNC) &_arrow_Table__Reset, 1}, 
                { "_arrow_RecordBatch__Reset", (DL_FUNC) 
&_arrow_RecordBatch__Reset, 1}, 
                {NULL, NULL, 0}
diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h
index 2329db1..1d0e26e 100644
--- a/r/src/arrow_cpp11.h
+++ b/r/src/arrow_cpp11.h
@@ -23,6 +23,7 @@
 #undef Free
 
 #include <cpp11.hpp>
+#include <cpp11/altrep.hpp>
 
 #include "./nameof.h"
 
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 909ccfb..b37c01c 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -23,14 +23,14 @@
 
 #if defined(ARROW_R_WITH_ARROW)
 
-#include <limits>
-#include <memory>
-#include <utility>
-
 #include <arrow/buffer.h>  // for RBuffer definition below
 #include <arrow/result.h>
 #include <arrow/status.h>
 
+#include <limits>
+#include <memory>
+#include <utility>
+
 // forward declaration-only headers
 #include <arrow/c/abi.h>
 #include <arrow/compute/type_fwd.h>
@@ -49,7 +49,6 @@ namespace fs = ::arrow::fs;
 
 SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& 
chunked_array);
 SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
-std::shared_ptr<arrow::Array> Array__from_vector(SEXP x, SEXP type);
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP, SEXP);
 arrow::MemoryPool* gc_memory_pool();
 
@@ -64,6 +63,8 @@ arrow::MemoryPool* gc_memory_pool();
 #define DATAPTR(x) (void*)STRING_PTR(x)
 #endif
 
+#define VECTOR_PTR_RO(x) ((const SEXP*)DATAPTR_RO(x))
+
 namespace arrow {
 
 static inline void StopIfNotOk(const Status& status) {
@@ -81,13 +82,15 @@ auto ValueOrStop(R&& result) -> 
decltype(std::forward<R>(result).ValueOrDie()) {
 namespace r {
 
 std::shared_ptr<arrow::DataType> InferArrowType(SEXP x);
+std::shared_ptr<arrow::Array> vec_to_arrow__reuse_memory(SEXP x);
+bool can_reuse_memory(SEXP x, const std::shared_ptr<arrow::DataType>& type);
 
 Status count_fields(SEXP lst, int* out);
 
-std::shared_ptr<arrow::Array> Array__from_vector(
-    SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_inferred);
-
 void inspect(SEXP obj);
+std::shared_ptr<arrow::Array> vec_to_arrow(SEXP x,
+                                           const 
std::shared_ptr<arrow::DataType>& type,
+                                           bool type_inferred);
 
 // the integer64 sentinel
 constexpr int64_t NA_INT64 = std::numeric_limits<int64_t>::min();
diff --git a/r/src/chunkedarray.cpp b/r/src/chunkedarray.cpp
index 52ceff7..10c6e84 100644
--- a/r/src/chunkedarray.cpp
+++ b/r/src/chunkedarray.cpp
@@ -19,6 +19,7 @@
 
 #if defined(ARROW_R_WITH_ARROW)
 
+#include <arrow/builder.h>
 #include <arrow/chunked_array.h>
 
 // [[arrow::export]]
@@ -94,4 +95,45 @@ std::string ChunkedArray__ToString(const 
std::shared_ptr<arrow::ChunkedArray>& x
   return x->ToString();
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::ChunkedArray> ChunkedArray__from_list(cpp11::list 
chunks,
+                                                             SEXP s_type) {
+  std::vector<std::shared_ptr<arrow::Array>> vec;
+
+  // the type might be NULL, in which case we need to infer it from the data
+  // we keep track of whether it was inferred or supplied
+  bool type_inferred = Rf_isNull(s_type);
+  R_xlen_t n = XLENGTH(chunks);
+
+  std::shared_ptr<arrow::DataType> type;
+  if (type_inferred) {
+    if (n == 0) {
+      cpp11::stop("type must be specified for empty list");
+    }
+    type = arrow::r::InferArrowType(VECTOR_ELT(chunks, 0));
+  } else {
+    type = cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(s_type);
+  }
+
+  if (n == 0) {
+    std::shared_ptr<arrow::Array> array;
+    std::unique_ptr<arrow::ArrayBuilder> type_builder;
+    StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder));
+    StopIfNotOk(type_builder->Finish(&array));
+    vec.push_back(array);
+  } else {
+    // the first - might differ from the rest of the loop
+    // because we might have inferred the type from the first element of the 
list
+    //
+    // this only really matters for dictionary arrays
+    vec.push_back(arrow::r::vec_to_arrow(chunks[0], type, type_inferred));
+
+    for (R_xlen_t i = 1; i < n; i++) {
+      vec.push_back(arrow::r::vec_to_arrow(chunks[i], type, false));
+    }
+  }
+
+  return std::make_shared<arrow::ChunkedArray>(std::move(vec));
+}
+
 #endif
diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp
new file mode 100644
index 0000000..2137e0f
--- /dev/null
+++ b/r/src/r_to_arrow.cpp
@@ -0,0 +1,1054 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#include "./arrow_vctrs.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+#include <arrow/array/builder_base.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_dict.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/type_traits.h>
+#include <arrow/util/bitmap_writer.h>
+#include <arrow/util/checked_cast.h>
+#include <arrow/util/converter.h>
+
+namespace arrow {
+
+using internal::checked_cast;
+using internal::checked_pointer_cast;
+
+using internal::Converter;
+using internal::DictionaryConverter;
+using internal::ListConverter;
+using internal::PrimitiveConverter;
+using internal::StructConverter;
+
+using internal::MakeChunker;
+using internal::MakeConverter;
+
+namespace r {
+
+struct RConversionOptions {
+  RConversionOptions() = default;
+
+  std::shared_ptr<arrow::DataType> type;
+  bool strict;
+  int64_t size;
+};
+
+enum RVectorType {
+  BOOLEAN,
+  UINT8,
+  INT32,
+  FLOAT64,
+  INT64,
+  COMPLEX,
+  STRING,
+  DATAFRAME,
+  DATE_INT,
+  DATE_DBL,
+  TIME,
+  POSIXCT,
+  POSIXLT,
+  BINARY,
+  LIST,
+  FACTOR,
+
+  OTHER
+};
+
+// this flattens out a logical type of what an R object is
+// because TYPEOF() is not detailed enough
+// we can't use arrow types though as there is no 1-1 mapping
+RVectorType GetVectorType(SEXP x) {
+  switch (TYPEOF(x)) {
+    case LGLSXP:
+      return BOOLEAN;
+    case RAWSXP:
+      return UINT8;
+    case INTSXP:
+      if (Rf_inherits(x, "factor")) {
+        return FACTOR;
+      } else if (Rf_inherits(x, "Date")) {
+        return DATE_INT;
+      }
+      return INT32;
+    case STRSXP:
+      return STRING;
+    case CPLXSXP:
+      return COMPLEX;
+    case REALSXP: {
+      if (Rf_inherits(x, "Date")) {
+        return DATE_DBL;
+      } else if (Rf_inherits(x, "integer64")) {
+        return INT64;
+      } else if (Rf_inherits(x, "POSIXct")) {
+        return POSIXCT;
+      } else if (Rf_inherits(x, "difftime")) {
+        return TIME;
+      } else {
+        return FLOAT64;
+      }
+    }
+    case VECSXP: {
+      if (Rf_inherits(x, "data.frame")) {
+        return DATAFRAME;
+      }
+
+      if (Rf_inherits(x, "POSIXlt")) {
+        return POSIXLT;
+      }
+
+      if (Rf_inherits(x, "arrow_binary")) {
+        return BINARY;
+      }
+
+      return LIST;
+    }
+    default:
+      break;
+  }
+  return OTHER;
+}
+
+template <typename T>
+bool is_NA(T value);
+
+template <>
+bool is_NA<int>(int value) {
+  return value == NA_INTEGER;
+}
+
+template <>
+bool is_NA<double>(double value) {
+  return ISNA(value);
+}
+
+template <>
+bool is_NA<uint8_t>(uint8_t value) {
+  return false;
+}
+
+template <>
+bool is_NA<cpp11::r_bool>(cpp11::r_bool value) {
+  return value == NA_LOGICAL;
+}
+
+template <>
+bool is_NA<cpp11::r_string>(cpp11::r_string value) {
+  return value == NA_STRING;
+}
+
+template <>
+bool is_NA<SEXP>(SEXP value) {
+  return Rf_isNull(value);
+}
+
+template <>
+bool is_NA<int64_t>(int64_t value) {
+  return value == NA_INT64;
+}
+
+template <typename T>
+struct RVectorVisitor {
+  using data_type =
+      typename std::conditional<std::is_same<T, int64_t>::value, double, 
T>::type;
+  using r_vector_type = cpp11::r_vector<data_type>;
+
+  template <typename AppendNull, typename AppendValue>
+  static Status Visit(SEXP x, int64_t size, AppendNull&& append_null,
+                      AppendValue&& append_value) {
+    r_vector_type values(x);
+    auto it = values.begin();
+
+    for (R_xlen_t i = 0; i < size; i++, ++it) {
+      auto value = GetValue(*it);
+
+      if (is_NA<T>(value)) {
+        RETURN_NOT_OK(append_null());
+      } else {
+        RETURN_NOT_OK(append_value(value));
+      }
+    }
+
+    return Status::OK();
+  }
+
+  static T GetValue(data_type x) { return x; }
+};
+
+template <>
+int64_t RVectorVisitor<int64_t>::GetValue(double x) {
+  int64_t value;
+  memcpy(&value, &x, sizeof(int64_t));
+  return value;
+}
+
+class RConverter : public Converter<SEXP, RConversionOptions> {
+ public:
+  virtual Status Append(SEXP) { return Status::NotImplemented("Append"); }
+
+  virtual Status Extend(SEXP values, int64_t size) {
+    return Status::NotImplemented("ExtendMasked");
+  }
+
+  virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) {
+    return Status::NotImplemented("ExtendMasked");
+  }
+};
+
+template <typename T, typename Enable = void>
+class RPrimitiveConverter;
+
+template <typename T>
+Result<T> CIntFromRScalarImpl(int64_t value) {
+  if (value < std::numeric_limits<T>::min() || value > 
std::numeric_limits<T>::max()) {
+    return Status::Invalid("value outside of range");
+  }
+  return static_cast<T>(value);
+}
+
+template <>
+Result<uint64_t> CIntFromRScalarImpl<uint64_t>(int64_t value) {
+  if (value < 0) {
+    return Status::Invalid("value outside of range");
+  }
+  return static_cast<uint64_t>(value);
+}
+
+// utility to convert R single values from (int, raw, double and int64) vectors
+// to arrow integers and floating point
+struct RConvert {
+  // ---- convert to an arrow integer
+  template <typename Type, typename From>
+  static enable_if_integer<Type, Result<typename Type::c_type>> Convert(Type*,
+                                                                        From 
from) {
+    return CIntFromRScalarImpl<typename Type::c_type>(from);
+  }
+
+  // ---- convert R integer types to double
+  template <typename Type, typename From>
+  static enable_if_t<std::is_same<Type, const DoubleType>::value &&
+                         !std::is_same<From, double>::value,
+                     Result<typename Type::c_type>>
+  Convert(Type*, From from) {
+    constexpr int64_t kDoubleMax = 1LL << 53;
+    constexpr int64_t kDoubleMin = -(1LL << 53);
+
+    if (from < kDoubleMin || from > kDoubleMax) {
+      return Status::Invalid("Integer value ", from, " is outside of the range 
exactly",
+                             " representable by a IEEE 754 double precision 
value");
+    }
+    return static_cast<double>(from);
+  }
+
+  // ---- convert double to double
+  template <typename Type, typename From>
+  static enable_if_t<std::is_same<Type, const DoubleType>::value &&
+                         std::is_same<From, double>::value,
+                     Result<typename Type::c_type>>
+  Convert(Type*, From from) {
+    return from;
+  }
+
+  // ---- convert R integer types to float
+  template <typename Type, typename From>
+  static enable_if_t<std::is_same<Type, const FloatType>::value &&
+                         !std::is_same<From, double>::value,
+                     Result<typename Type::c_type>>
+  Convert(Type*, From from) {
+    constexpr int64_t kFloatMax = 1LL << 24;
+    constexpr int64_t kFloatMin = -(1LL << 24);
+
+    if (from < kFloatMin || from > kFloatMax) {
+      return Status::Invalid("Integer value ", from, " is outside of the range 
exactly",
+                             " representable by a IEEE 754 single precision 
value");
+    }
+    return static_cast<float>(from);
+  }
+
+  // ---- convert double to float
+  template <typename Type, typename From>
+  static enable_if_t<std::is_same<Type, const FloatType>::value &&
+                         std::is_same<From, double>::value,
+                     Result<typename Type::c_type>>
+  Convert(Type*, From from) {
+    return static_cast<float>(from);
+  }
+
+  // ---- convert to half float: not implemented
+  template <typename Type, typename From>
+  static enable_if_t<std::is_same<Type, const HalfFloatType>::value,
+                     Result<typename Type::c_type>>
+  Convert(Type*, From from) {
+    return Status::Invalid("Cannot convert to Half Float");
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_null<T>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP, int64_t size) override {
+    return this->primitive_builder_->AppendNulls(size);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<
+    T, enable_if_t<is_integer_type<T>::value || is_floating_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    auto rtype = GetVectorType(x);
+    switch (rtype) {
+      case UINT8:
+        return AppendRangeDispatch<unsigned char>(x, size);
+      case INT32:
+        return AppendRangeDispatch<int>(x, size);
+      case FLOAT64:
+        return AppendRangeDispatch<double>(x, size);
+      case INT64:
+        return AppendRangeDispatch<int64_t>(x, size);
+
+      default:
+        break;
+    }
+    // TODO: mention T in the error
+    return Status::Invalid("cannot convert");
+  }
+
+ private:
+  template <typename r_value_type>
+  Status AppendRangeLoopDifferentType(SEXP x, int64_t size) {
+    RETURN_NOT_OK(this->Reserve(size));
+
+    auto append_value = [this](r_value_type value) {
+      ARROW_ASSIGN_OR_RAISE(auto converted,
+                            RConvert::Convert(this->primitive_type_, value));
+      this->primitive_builder_->UnsafeAppend(converted);
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<r_value_type>::Visit(x, size, append_null, 
append_value);
+  }
+
+  template <typename r_value_type>
+  Status AppendRangeSameTypeNotALTREP(SEXP x, int64_t size) {
+    auto p = reinterpret_cast<const r_value_type*>(DATAPTR_RO(x));
+    auto p_end = p + size;
+
+    auto first_na = std::find_if(p, p_end, is_NA<r_value_type>);
+
+    if (first_na == p_end) {
+      // no nulls, so we can use AppendValues() directly
+      return this->primitive_builder_->AppendValues(p, p_end);
+    }
+
+    // Append all values up until the first NULL
+    RETURN_NOT_OK(this->primitive_builder_->AppendValues(p, first_na));
+
+    // loop for the remaining
+    RETURN_NOT_OK(this->primitive_builder_->Reserve(p_end - first_na));
+    p = first_na;
+    for (; p < p_end; ++p) {
+      r_value_type value = *p;
+      if (is_NA<r_value_type>(value)) {
+        this->primitive_builder_->UnsafeAppendNull();
+      } else {
+        this->primitive_builder_->UnsafeAppend(value);
+      }
+    }
+    return Status::OK();
+  }
+
+  template <typename r_value_type>
+  Status AppendRangeSameTypeALTREP(SEXP x, int64_t size) {
+    // if it is altrep, then we use cpp11 looping
+    // without needing to convert
+    RETURN_NOT_OK(this->primitive_builder_->Reserve(size));
+    typename RVectorVisitor<r_value_type>::r_vector_type vec(x);
+    auto it = vec.begin();
+    for (R_xlen_t i = 0; i < size; i++, ++it) {
+      r_value_type value = RVectorVisitor<r_value_type>::GetValue(*it);
+      if (is_NA<r_value_type>(value)) {
+        this->primitive_builder_->UnsafeAppendNull();
+      } else {
+        this->primitive_builder_->UnsafeAppend(value);
+      }
+    }
+    return Status::OK();
+  }
+
+  template <typename r_value_type>
+  Status AppendRangeDispatch(SEXP x, int64_t size) {
+    if (std::is_same<typename T::c_type, r_value_type>::value) {
+      if (!ALTREP(x)) {
+        return AppendRangeSameTypeNotALTREP<r_value_type>(x, size);
+      } else {
+        return AppendRangeSameTypeALTREP<r_value_type>(x, size);
+      }
+    }
+
+    // here if underlying types differ so going
+    return AppendRangeLoopDifferentType<r_value_type>(x, size);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_boolean_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    auto rtype = GetVectorType(x);
+    if (rtype != BOOLEAN) {
+      return Status::Invalid("Expecting a logical vector");
+    }
+    RETURN_NOT_OK(this->Reserve(size));
+
+    auto append_value = [this](cpp11::r_bool value) {
+      this->primitive_builder_->UnsafeAppend(value == 1);
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<cpp11::r_bool>::Visit(x, size, append_null, 
append_value);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_date_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+
+    switch (GetVectorType(x)) {
+      case DATE_INT:
+        return AppendRange_Date<int>(x, size);
+
+      case DATE_DBL:
+        return AppendRange_Date<double>(x, size);
+
+      case POSIXCT:
+        return AppendRange_Posixct(x, size);
+
+      default:
+        break;
+    }
+
+    return Status::Invalid("cannot convert to date type ");
+  }
+
+ private:
+  template <typename r_value_type>
+  Status AppendRange_Date(SEXP x, int64_t size) {
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    auto append_value = [this](r_value_type value) {
+      this->primitive_builder_->UnsafeAppend(FromRDate(this->primitive_type_, 
value));
+      return Status::OK();
+    };
+
+    return RVectorVisitor<r_value_type>::Visit(x, size, append_null, 
append_value);
+  }
+
+  Status AppendRange_Posixct(SEXP x, int64_t size) {
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    auto append_value = [this](double value) {
+      
this->primitive_builder_->UnsafeAppend(FromPosixct(this->primitive_type_, 
value));
+      return Status::OK();
+    };
+
+    return RVectorVisitor<double>::Visit(x, size, append_null, append_value);
+  }
+
+  static int FromRDate(const Date32Type*, int from) { return from; }
+
+  static int64_t FromRDate(const Date64Type*, int from) {
+    constexpr int64_t kMilliSecondsPerDay = 86400000;
+    return from * kMilliSecondsPerDay;
+  }
+
+  static int FromPosixct(const Date32Type*, double from) {
+    constexpr int64_t kSecondsPerDay = 86400;
+    return from / kSecondsPerDay;
+  }
+
+  static int64_t FromPosixct(const Date64Type*, double from) { return from * 
1000; }
+};
+
+int64_t get_TimeUnit_multiplier(TimeUnit::type unit) {
+  switch (unit) {
+    case TimeUnit::SECOND:
+      return 1;
+    case TimeUnit::MILLI:
+      return 1000;
+    case TimeUnit::MICRO:
+      return 1000000;
+    case TimeUnit::NANO:
+      return 1000000000;
+    default:
+      return 0;
+  }
+}
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_time_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+    auto rtype = GetVectorType(x);
+    if (rtype != TIME) {
+      return Status::Invalid("Invalid conversion to time");
+    }
+
+    // multiplier to get the number of seconds from the value stored in the R 
vector
+    int difftime_multiplier;
+    std::string unit(CHAR(STRING_ELT(Rf_getAttrib(x, symbols::units), 0)));
+    if (unit == "secs") {
+      difftime_multiplier = 1;
+    } else if (unit == "mins") {
+      difftime_multiplier = 60;
+    } else if (unit == "hours") {
+      difftime_multiplier = 3600;
+    } else if (unit == "days") {
+      difftime_multiplier = 86400;
+    } else if (unit == "weeks") {
+      difftime_multiplier = 604800;
+    } else {
+      return Status::Invalid("unknown difftime unit");
+    }
+
+    // then multiply the seconds by this to match the time unit
+    auto multiplier =
+        get_TimeUnit_multiplier(this->primitive_type_->unit()) * 
difftime_multiplier;
+
+    auto append_value = [this, multiplier](double value) {
+      auto converted = static_cast<typename T::c_type>(value * multiplier);
+      this->primitive_builder_->UnsafeAppend(converted);
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<double>::Visit(x, size, append_null, append_value);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_timestamp_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+
+    RVectorType rtype = GetVectorType(x);
+    if (rtype != POSIXCT) {
+      return Status::Invalid("Invalid conversion to timestamp");
+    }
+
+    int64_t multiplier = 
get_TimeUnit_multiplier(this->primitive_type_->unit());
+
+    auto append_value = [this, multiplier](double value) {
+      auto converted = static_cast<typename T::c_type>(value * multiplier);
+      this->primitive_builder_->UnsafeAppend(converted);
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<double>::Visit(x, size, append_null, append_value);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_decimal_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    return Status::NotImplemented("Extend");
+  }
+};
+
+Status check_binary(SEXP x, int64_t size) {
+  RVectorType rtype = GetVectorType(x);
+  switch (rtype) {
+    case BINARY:
+      break;
+    case LIST: {
+      // check this is a list of raw vectors
+      const SEXP* p_x = VECTOR_PTR_RO(x);
+      for (R_xlen_t i = 0; i < size; i++, ++p_x) {
+        if (TYPEOF(*p_x) != RAWSXP) {
+          return Status::Invalid("invalid R type to convert to binary");
+        }
+      }
+      break;
+    }
+    default:
+      return Status::Invalid("invalid R type to convert to binary");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_binary<T>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  using OffsetType = typename T::offset_type;
+
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+    RETURN_NOT_OK(check_binary(x, size));
+
+    auto append_value = [this](SEXP raw) {
+      R_xlen_t n = XLENGTH(raw);
+      ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n));
+      this->primitive_builder_->UnsafeAppend(RAW_RO(raw), 
static_cast<OffsetType>(n));
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<SEXP>::Visit(x, size, append_null, append_value);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<std::is_same<T, 
FixedSizeBinaryType>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+    RETURN_NOT_OK(check_binary(x, size));
+
+    auto append_value = [this](SEXP raw) {
+      R_xlen_t n = XLENGTH(raw);
+
+      if (n != this->primitive_builder_->byte_width()) {
+        return Status::Invalid("invalid size");
+      }
+      ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n));
+      this->primitive_builder_->UnsafeAppend(RAW_RO(raw));
+      return Status::OK();
+    };
+    auto append_null = [this]() {
+      this->primitive_builder_->UnsafeAppendNull();
+      return Status::OK();
+    };
+    return RVectorVisitor<SEXP>::Visit(x, size, append_null, append_value);
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_string_like<T>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  using OffsetType = typename T::offset_type;
+
+  Status Extend(SEXP x, int64_t size) override {
+    int64_t start = 0;
+    RVectorType rtype = GetVectorType(x);
+    if (rtype != STRING) {
+      return Status::Invalid("Expecting a character vector");
+    }
+
+    cpp11::strings s(arrow::r::utf8_strings(x));
+    RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size()));
+    auto it = s.begin() + start;
+
+    // we know all the R strings are utf8 already, so we can get
+    // a definite size and then use UnsafeAppend*()
+    int64_t total_length = 0;
+    for (R_xlen_t i = 0; i < size; i++, ++it) {
+      cpp11::r_string si = *it;
+      total_length += cpp11::is_na(si) ? 0 : si.size();
+    }
+    RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length));
+
+    // append
+    it = s.begin() + start;
+    for (R_xlen_t i = 0; i < size; i++, ++it) {
+      cpp11::r_string si = *it;
+      if (si == NA_STRING) {
+        this->primitive_builder_->UnsafeAppendNull();
+      } else {
+        this->primitive_builder_->UnsafeAppend(CHAR(si), si.size());
+      }
+    }
+
+    return Status::OK();
+  }
+};
+
+template <typename T>
+class RPrimitiveConverter<T, enable_if_t<is_duration_type<T>::value>>
+    : public PrimitiveConverter<T, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    // TODO: look in lubridate
+    return Status::NotImplemented("Extend");
+  }
+};
+
+template <typename T>
+class RListConverter;
+
+template <typename U, typename Enable = void>
+class RDictionaryConverter;
+
+template <typename U>
+class RDictionaryConverter<U, enable_if_has_c_type<U>>
+    : public DictionaryConverter<U, RConverter> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    return Status::NotImplemented("Extend");
+  }
+};
+
+template <typename ValueType>
+class RDictionaryConverter<ValueType, enable_if_has_string_view<ValueType>>
+    : public DictionaryConverter<ValueType, RConverter> {
+ public:
+  using BuilderType = DictionaryBuilder<ValueType>;
+
+  Status Extend(SEXP x, int64_t size) override {
+    // first we need to handle the levels
+    cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol));
+    auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false);
+    RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array));
+
+    // then we can proceed
+    RETURN_NOT_OK(this->Reserve(size));
+
+    RVectorType rtype = GetVectorType(x);
+    if (rtype != FACTOR) {
+      return Status::Invalid("invalid R type to convert to dictionary");
+    }
+
+    auto append_value = [this, levels](int value) {
+      SEXP s = STRING_ELT(levels, value - 1);
+      return this->value_builder_->Append(CHAR(s));
+    };
+    auto append_null = [this]() { return this->value_builder_->AppendNull(); };
+    return RVectorVisitor<int>::Visit(x, size, append_null, append_value);
+  }
+
+  Result<std::shared_ptr<Array>> ToArray() override {
+    ARROW_ASSIGN_OR_RAISE(auto result, this->builder_->Finish());
+
+    auto result_type = checked_cast<DictionaryType*>(result->type().get());
+    if (this->dict_type_->ordered() && !result_type->ordered()) {
+      // TODO: we should not have to do that, there is probably something wrong
+      //       in the DictionaryBuilder code
+      result->data()->type =
+          arrow::dictionary(result_type->index_type(), 
result_type->value_type(), true);
+    }
+
+    return result;
+  }
+};
+
+template <typename T, typename Enable = void>
+struct RConverterTrait;
+
+template <typename T>
+struct RConverterTrait<
+    T, enable_if_t<!is_nested_type<T>::value && !is_interval_type<T>::value &&
+                   !is_extension_type<T>::value>> {
+  using type = RPrimitiveConverter<T>;
+};
+
+template <typename T>
+struct RConverterTrait<T, enable_if_list_like<T>> {
+  using type = RListConverter<T>;
+};
+
+template <typename T>
+class RListConverter : public ListConverter<T, RConverter, RConverterTrait> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    RETURN_NOT_OK(this->Reserve(size));
+
+    RVectorType rtype = GetVectorType(x);
+    if (rtype != LIST) {
+      return Status::Invalid("Cannot convert to list type");
+    }
+
+    auto append_value = [this](SEXP value) {
+      // TODO: this should always use vctrs::short_vec_size
+      //       but that introduced a regression:
+      //       https://github.com/apache/arrow/pull/8650#issuecomment-786940734
+      int n;
+      if (TYPEOF(value) == VECSXP && !Rf_inherits(value, "data.frame")) {
+        n = Rf_length(value);
+      } else {
+        n = vctrs::short_vec_size(value);
+      }
+
+      RETURN_NOT_OK(this->list_builder_->ValidateOverflow(n));
+      RETURN_NOT_OK(this->list_builder_->Append());
+      return this->value_converter_.get()->Extend(value, n);
+    };
+    auto append_null = [this]() { return this->list_builder_->AppendNull(); };
+    return RVectorVisitor<SEXP>::Visit(x, size, append_null, append_value);
+  }
+};
+
+class RStructConverter;
+
+template <>
+struct RConverterTrait<StructType> {
+  using type = RStructConverter;
+};
+
+class RStructConverter : public StructConverter<RConverter, RConverterTrait> {
+ public:
+  Status Extend(SEXP x, int64_t size) override {
+    // check that x is compatible
+    R_xlen_t n_columns = XLENGTH(x);
+
+    if (!Rf_inherits(x, "data.frame") && !Rf_inherits(x, "POSIXlt")) {
+      return Status::Invalid("Can only convert data frames to Struct type");
+    }
+
+    auto fields = this->struct_type_->fields();
+    if (n_columns != static_cast<R_xlen_t>(fields.size())) {
+      return Status::RError("Number of fields in struct (", fields.size(),
+                            ") incompatible with number of columns in the data 
frame (",
+                            n_columns, ")");
+    }
+
+    cpp11::strings x_names = Rf_getAttrib(x, R_NamesSymbol);
+
+    RETURN_NOT_OK(cpp11::unwind_protect([&] {
+      for (int i = 0; i < n_columns; i++) {
+        const char* name_i = arrow::r::unsafe::utf8_string(x_names[i]);
+        auto field_name = fields[i]->name();
+        if (field_name != name_i) {
+          return Status::RError(
+              "Field name in position ", i, " (", field_name,
+              ") does not match the name of the column of the data frame (", 
name_i, ")");
+        }
+      }
+
+      return Status::OK();
+    }));
+
+    for (R_xlen_t i = 0; i < n_columns; i++) {
+      std::string name(x_names[i]);
+      if (name != fields[i]->name()) {
+        return Status::RError(
+            "Field name in position ", i, " (", fields[i]->name(),
+            ") does not match the name of the column of the data frame (", 
name, ")");
+      }
+    }
+
+    for (R_xlen_t i = 0; i < n_columns; i++) {
+      SEXP x_i = VECTOR_ELT(x, i);
+      if (vctrs::short_vec_size(x_i) < size) {
+        return Status::RError("Degenerated data frame");
+      }
+    }
+
+    RETURN_NOT_OK(this->Reserve(size));
+
+    for (R_xlen_t i = 0; i < size; i++) {
+      RETURN_NOT_OK(struct_builder_->Append());
+    }
+
+    for (R_xlen_t i = 0; i < n_columns; i++) {
+      auto status = children_[i]->Extend(VECTOR_ELT(x, i), size);
+      if (!status.ok()) {
+        return Status::Invalid("Problem with column ", (i + 1), " (", 
fields[i]->name(),
+                               "): ", status.ToString());
+      }
+    }
+
+    return Status::OK();
+  }
+
+ protected:
+  Status Init(MemoryPool* pool) override {
+    return StructConverter<RConverter, RConverterTrait>::Init(pool);
+  }
+};
+
+template <>
+struct RConverterTrait<DictionaryType> {
+  template <typename T>
+  using dictionary_type = RDictionaryConverter<T>;
+};
+
+// ---- short circuit the Converter api entirely when we can do zero-copy
+
+// in some situations we can just use the memory of the R object in an RBuffer
+// instead of going through ArrayBuilder, etc ...
+bool can_reuse_memory(SEXP x, const std::shared_ptr<arrow::DataType>& type) {
+  // TODO: this probably should be disabled when x is an ALTREP object
+  //       because MakeSimpleArray below will force materialization
+  switch (type->id()) {
+    case Type::INT32:
+      return TYPEOF(x) == INTSXP && !OBJECT(x);
+    case Type::DOUBLE:
+      return TYPEOF(x) == REALSXP && !OBJECT(x);
+    case Type::INT8:
+      return TYPEOF(x) == RAWSXP && !OBJECT(x);
+    case Type::INT64:
+      return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64");
+    default:
+      break;
+  }
+  return false;
+}
+
+// this is only used on some special cases when the arrow Array can just use 
the memory of
+// the R object, via an RBuffer, hence be zero copy
+template <int RTYPE, typename RVector, typename Type>
+std::shared_ptr<Array> MakeSimpleArray(SEXP x) {
+  using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
+  RVector vec(x);
+  auto n = vec.size();
+  auto p_vec_start = reinterpret_cast<const value_type*>(DATAPTR_RO(vec));
+  auto p_vec_end = p_vec_start + n;
+  std::vector<std::shared_ptr<Buffer>> buffers{nullptr,
+                                               
std::make_shared<RBuffer<RVector>>(vec)};
+
+  int null_count = 0;
+
+  auto first_na = std::find_if(p_vec_start, p_vec_end, is_NA<value_type>);
+  if (first_na < p_vec_end) {
+    auto null_bitmap =
+        ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), 
gc_memory_pool()));
+    internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 
0, n);
+
+    // first loop to clear all the bits before the first NA
+    auto j = std::distance(p_vec_start, first_na);
+    int i = 0;
+    for (; i < j; i++, bitmap_writer.Next()) {
+      bitmap_writer.Set();
+    }
+
+    auto p_vec = first_na;
+    // then finish
+    for (; i < n; i++, bitmap_writer.Next(), ++p_vec) {
+      if (is_NA<value_type>(*p_vec)) {
+        bitmap_writer.Clear();
+        null_count++;
+      } else {
+        bitmap_writer.Set();
+      }
+    }
+
+    bitmap_writer.Finish();
+    buffers[0] = std::move(null_bitmap);
+  }
+
+  auto data = ArrayData::Make(std::make_shared<Type>(), LENGTH(x), 
std::move(buffers),
+                              null_count, 0 /*offset*/);
+
+  // return the right Array class
+  return std::make_shared<typename TypeTraits<Type>::ArrayType>(data);
+}
+
+std::shared_ptr<arrow::Array> vec_to_arrow__reuse_memory(SEXP x) {
+  auto type = TYPEOF(x);
+
+  if (type == INTSXP) {
+    return MakeSimpleArray<INTSXP, cpp11::integers, Int32Type>(x);
+  } else if (type == REALSXP && Rf_inherits(x, "integer64")) {
+    return MakeSimpleArray<REALSXP, cpp11::doubles, Int64Type>(x);
+  } else if (type == REALSXP) {
+    return MakeSimpleArray<REALSXP, cpp11::doubles, DoubleType>(x);
+  } else if (type == RAWSXP) {
+    return MakeSimpleArray<RAWSXP, cpp11::raws, UInt8Type>(x);
+  }
+
+  cpp11::stop("Unreachable: you might need to fix can_reuse_memory()");
+}
+
+std::shared_ptr<arrow::Array> vec_to_arrow(SEXP x,
+                                           const 
std::shared_ptr<arrow::DataType>& type,
+                                           bool type_inferred) {
+  // short circuit if `x` is already an Array
+  if (Rf_inherits(x, "Array")) {
+    return cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x);
+  }
+
+  RConversionOptions options;
+  options.strict = !type_inferred;
+  options.type = type;
+  options.size = vctrs::short_vec_size(x);
+
+  // maybe short circuit when zero-copy is possible
+  if (can_reuse_memory(x, options.type)) {
+    return vec_to_arrow__reuse_memory(x);
+  }
+
+  // otherwise go through the converter api
+  auto converter = ValueOrStop(MakeConverter<RConverter, RConverterTrait>(
+      options.type, options, gc_memory_pool()));
+
+  StopIfNotOk(converter->Extend(x, options.size));
+  return ValueOrStop(converter->ToArray());
+}
+
+}  // namespace r
+}  // namespace arrow
+
+// [[arrow::export]]
+SEXP vec_to_arrow(SEXP x, SEXP s_type) {
+  if (Rf_inherits(x, "Array")) return x;
+  bool type_inferred = Rf_isNull(s_type);
+  std::shared_ptr<arrow::DataType> type;
+
+  if (type_inferred) {
+    type = arrow::r::InferArrowType(x);
+  } else {
+    type = cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(s_type);
+  }
+  return cpp11::to_r6(arrow::r::vec_to_arrow(x, type, type_inferred));
+}
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Array> DictionaryArray__FromArrays(
+    const std::shared_ptr<arrow::DataType>& type,
+    const std::shared_ptr<arrow::Array>& indices,
+    const std::shared_ptr<arrow::Array>& dict) {
+  return ValueOrStop(arrow::DictionaryArray::FromArrays(type, indices, dict));
+}
+
+#endif
diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp
index 6eb1f0c..9628d46 100644
--- a/r/src/recordbatch.cpp
+++ b/r/src/recordbatch.cpp
@@ -268,7 +268,7 @@ std::shared_ptr<arrow::RecordBatch> 
RecordBatch__from_arrays__known_schema(
       cpp11::stop("field at index %d has name '%s' != '%s'", j + 1,
                   schema->field(j)->name().c_str(), name.c_str());
     }
-    arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), 
false);
+    arrays[j] = arrow::r::vec_to_arrow(x, schema->field(j)->type(), false);
   };
 
   arrow::r::TraverseDots(lst, num_fields, fill_array);
@@ -285,7 +285,7 @@ arrow::Status CollectRecordBatchArrays(
     SEXP lst, const std::shared_ptr<arrow::Schema>& schema, int num_fields, 
bool inferred,
     std::vector<std::shared_ptr<arrow::Array>>& arrays) {
   auto extract_one_array = [&arrays, &schema, inferred](int j, SEXP x, 
cpp11::r_string) {
-    arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), 
inferred);
+    arrays[j] = arrow::r::vec_to_arrow(x, schema->field(j)->type(), inferred);
   };
   arrow::r::TraverseDots(lst, num_fields, extract_one_array);
   return arrow::Status::OK();
diff --git a/r/src/table.cpp b/r/src/table.cpp
index 14e5f4e..081d14a 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -265,7 +265,7 @@ arrow::Status CollectTableColumns(
       columns[j] = std::make_shared<arrow::ChunkedArray>(
           cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x));
     } else {
-      auto array = arrow::r::Array__from_vector(x, schema->field(j)->type(), 
inferred);
+      auto array = arrow::r::vec_to_arrow(x, schema->field(j)->type(), 
inferred);
       columns[j] = std::make_shared<arrow::ChunkedArray>(array);
     }
   };
diff --git a/r/src/type_infer.cpp b/r/src/type_infer.cpp
new file mode 100644
index 0000000..93e51be
--- /dev/null
+++ b/r/src/type_infer.cpp
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "./arrow_types.h"
+#include "./arrow_vctrs.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+#include <arrow/array/array_base.h>
+
+namespace arrow {
+namespace r {
+
+static inline std::shared_ptr<arrow::DataType> IndexTypeForFactors(int 
n_factors) {
+  if (n_factors < INT8_MAX) {
+    return arrow::int8();
+  } else if (n_factors < INT16_MAX) {
+    return arrow::int16();
+  } else {
+    return arrow::int32();
+  }
+}
+
+std::shared_ptr<arrow::DataType> InferArrowTypeFromFactor(SEXP factor) {
+  SEXP factors = Rf_getAttrib(factor, R_LevelsSymbol);
+  auto index_type = IndexTypeForFactors(Rf_length(factors));
+  bool is_ordered = Rf_inherits(factor, "ordered");
+  return dictionary(index_type, arrow::utf8(), is_ordered);
+}
+
+template <int VectorType>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector(SEXP x) {
+  cpp11::stop("Unknown vector type: ", VectorType);
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<ENVSXP>(SEXP x) {
+  if (Rf_inherits(x, "Array")) {
+    return cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x)->type();
+  }
+
+  cpp11::stop("Unrecognized vector instance for type ENVSXP");
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<LGLSXP>(SEXP x) {
+  return Rf_inherits(x, "vctrs_unspecified") ? null() : boolean();
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<INTSXP>(SEXP x) {
+  if (Rf_isFactor(x)) {
+    return InferArrowTypeFromFactor(x);
+  } else if (Rf_inherits(x, "Date")) {
+    return date32();
+  } else if (Rf_inherits(x, "POSIXct")) {
+    auto tzone_sexp = Rf_getAttrib(x, symbols::tzone);
+    if (Rf_isNull(tzone_sexp)) {
+      return timestamp(TimeUnit::MICRO);
+    } else {
+      return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0)));
+    }
+  }
+  return int32();
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<REALSXP>(SEXP x) {
+  if (Rf_inherits(x, "Date")) {
+    return date32();
+  }
+  if (Rf_inherits(x, "POSIXct")) {
+    auto tzone_sexp = Rf_getAttrib(x, symbols::tzone);
+    if (Rf_isNull(tzone_sexp)) {
+      return timestamp(TimeUnit::MICRO);
+    } else {
+      return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0)));
+    }
+  }
+  if (Rf_inherits(x, "integer64")) {
+    return int64();
+  }
+  if (Rf_inherits(x, "difftime")) {
+    return time32(TimeUnit::SECOND);
+  }
+  return float64();
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<STRSXP>(SEXP x) {
+  return cpp11::unwind_protect([&] {
+    R_xlen_t n = XLENGTH(x);
+
+    int64_t size = 0;
+
+    for (R_xlen_t i = 0; i < n; i++) {
+      size += arrow::r::unsafe::r_string_size(STRING_ELT(x, i));
+      if (size > arrow::kBinaryMemoryLimit) {
+        // Exceeds 2GB capacity of utf8 type, so use large
+        return large_utf8();
+      }
+    }
+
+    return utf8();
+  });
+}
+
+static inline std::shared_ptr<arrow::DataType> InferArrowTypeFromDataFrame(
+    cpp11::list x) {
+  R_xlen_t n = x.size();
+  cpp11::strings names(x.attr(R_NamesSymbol));
+  std::vector<std::shared_ptr<arrow::Field>> fields(n);
+  for (R_xlen_t i = 0; i < n; i++) {
+    fields[i] = arrow::field(names[i], InferArrowType(x[i]));
+  }
+  return arrow::struct_(std::move(fields));
+}
+
+template <>
+std::shared_ptr<arrow::DataType> InferArrowTypeFromVector<VECSXP>(SEXP x) {
+  if (Rf_inherits(x, "data.frame") || Rf_inherits(x, "POSIXlt")) {
+    return InferArrowTypeFromDataFrame(x);
+  } else {
+    // some known special cases
+    if (Rf_inherits(x, "arrow_fixed_size_binary")) {
+      SEXP byte_width = Rf_getAttrib(x, symbols::byte_width);
+      if (Rf_isNull(byte_width) || TYPEOF(byte_width) != INTSXP ||
+          XLENGTH(byte_width) != 1) {
+        cpp11::stop("malformed arrow_fixed_size_binary object");
+      }
+      return arrow::fixed_size_binary(INTEGER(byte_width)[0]);
+    }
+
+    if (Rf_inherits(x, "arrow_binary")) {
+      return arrow::binary();
+    }
+
+    if (Rf_inherits(x, "arrow_large_binary")) {
+      return arrow::large_binary();
+    }
+
+    SEXP ptype = Rf_getAttrib(x, symbols::ptype);
+    if (Rf_isNull(ptype)) {
+      if (XLENGTH(x) == 0) {
+        cpp11::stop(
+            "Requires at least one element to infer the values' type of a list 
vector");
+      }
+
+      ptype = VECTOR_ELT(x, 0);
+    }
+
+    return arrow::list(InferArrowType(ptype));
+  }
+}
+
+std::shared_ptr<arrow::DataType> InferArrowType(SEXP x) {
+  switch (TYPEOF(x)) {
+    case ENVSXP:
+      return InferArrowTypeFromVector<ENVSXP>(x);
+    case LGLSXP:
+      return InferArrowTypeFromVector<LGLSXP>(x);
+    case INTSXP:
+      return InferArrowTypeFromVector<INTSXP>(x);
+    case REALSXP:
+      return InferArrowTypeFromVector<REALSXP>(x);
+    case RAWSXP:
+      return int8();
+    case STRSXP:
+      return InferArrowTypeFromVector<STRSXP>(x);
+    case VECSXP:
+      return InferArrowTypeFromVector<VECSXP>(x);
+    default:
+      break;
+  }
+
+  cpp11::stop("Cannot infer type from vector");
+}
+
+}  // namespace r
+}  // namespace arrow
+
+// [[arrow::export]]
+std::shared_ptr<arrow::DataType> Array__infer_type(SEXP x) {
+  return arrow::r::InferArrowType(x);
+}
+
+#endif
diff --git a/r/tests/testthat/test-Array.R b/r/tests/testthat/test-Array.R
index 03c4f37..5cf0b0d 100644
--- a/r/tests/testthat/test-Array.R
+++ b/r/tests/testthat/test-Array.R
@@ -385,24 +385,23 @@ test_that("Array$create() supports the type= argument. 
conversion from INTSXP an
 })
 
 test_that("Array$create() aborts on overflow", {
-  msg <- "Invalid.*Value is too large"
-  expect_error(Array$create(128L, type = int8()), msg)
-  expect_error(Array$create(-129L, type = int8()), msg)
+  expect_error(Array$create(128L, type = int8()))
+  expect_error(Array$create(-129L, type = int8()))
 
-  expect_error(Array$create(256L, type = uint8()), msg)
-  expect_error(Array$create(-1L, type = uint8()), msg)
+  expect_error(Array$create(256L, type = uint8()))
+  expect_error(Array$create(-1L, type = uint8()))
 
-  expect_error(Array$create(32768L, type = int16()), msg)
-  expect_error(Array$create(-32769L, type = int16()), msg)
+  expect_error(Array$create(32768L, type = int16()))
+  expect_error(Array$create(-32769L, type = int16()))
 
-  expect_error(Array$create(65536L, type = uint16()), msg)
-  expect_error(Array$create(-1L, type = uint16()), msg)
+  expect_error(Array$create(65536L, type = uint16()))
+  expect_error(Array$create(-1L, type = uint16()))
 
-  expect_error(Array$create(65536L, type = uint16()), msg)
-  expect_error(Array$create(-1L, type = uint16()), msg)
+  expect_error(Array$create(65536L, type = uint16()))
+  expect_error(Array$create(-1L, type = uint16()))
 
-  expect_error(Array$create(bit64::as.integer64(2^31), type = int32()), msg)
-  expect_error(Array$create(bit64::as.integer64(2^32), type = uint32()), msg)
+  expect_error(Array$create(bit64::as.integer64(2^31), type = int32()))
+  expect_error(Array$create(bit64::as.integer64(2^32), type = uint32()))
 })
 
 test_that("Array$create() does not convert doubles to integer", {
@@ -483,7 +482,7 @@ test_that("Array$create() can handle data frame with custom 
struct type (not inf
   expect_error(Array$create(df, type = type), regexp = "Field name in 
position.*does not match the name of the column of the data frame")
 
   type <- struct(x = float64(), y = utf8())
-  expect_error(Array$create(df, type = type), regexp = "Expecting a character 
vector")
+  expect_error(Array$create(df, type = type), regexp = "Invalid")
 })
 
 test_that("Array$create() supports tibble with no columns (ARROW-8354)", {
@@ -651,8 +650,6 @@ test_that("Handling string data with embedded nuls", {
 
 test_that("Array$create() should have helpful error", {
   expect_error(Array$create(list(numeric(0)), list_of(bool())), "Expecting a 
logical vector")
-  expect_error(Array$create(list(numeric(0)), list_of(int32())), "Expecting an 
integer vector")
-  expect_error(Array$create(list(integer(0)), list_of(float64())), "Expecting 
a numeric vector")
 
   lgl <- logical(0)
   int <- integer(0)
@@ -661,7 +658,6 @@ test_that("Array$create() should have helpful error", {
   expect_error(Array$create(list()), "Requires at least one element to infer")
   expect_error(Array$create(list(lgl, lgl, int)), "Expecting a logical vector")
   expect_error(Array$create(list(char, num, char)), "Expecting a character 
vector")
-  expect_error(Array$create(list(int, int, num)), "Expecting an integer 
vector")
 })
 
 test_that("Array$View() (ARROW-6542)", {
diff --git a/r/tests/testthat/test-chunked-array.R 
b/r/tests/testthat/test-chunked-array.R
index 792e140..a5ff6ef 100644
--- a/r/tests/testthat/test-chunked-array.R
+++ b/r/tests/testthat/test-chunked-array.R
@@ -220,23 +220,23 @@ test_that("chunked_array() supports the type= argument. 
conversion from INTSXP a
 })
 
 test_that("ChunkedArray$create() aborts on overflow", {
-  expect_error(chunked_array(128L, type = int8())$type, "Invalid.*Value is too 
large")
-  expect_error(chunked_array(-129L, type = int8())$type, "Invalid.*Value is 
too large")
+  expect_error(chunked_array(128L, type = int8())$type)
+  expect_error(chunked_array(-129L, type = int8())$type)
 
-  expect_error(chunked_array(256L, type = uint8())$type, "Invalid.*Value is 
too large")
-  expect_error(chunked_array(-1L, type = uint8())$type, "Invalid.*Value is too 
large")
+  expect_error(chunked_array(256L, type = uint8())$type)
+  expect_error(chunked_array(-1L, type = uint8())$type)
 
-  expect_error(chunked_array(32768L, type = int16())$type, "Invalid.*Value is 
too large")
-  expect_error(chunked_array(-32769L, type = int16())$type, "Invalid.*Value is 
too large")
+  expect_error(chunked_array(32768L, type = int16())$type)
+  expect_error(chunked_array(-32769L, type = int16())$type)
 
-  expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*Value is 
too large")
-  expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*Value is 
too large")
+  expect_error(chunked_array(65536L, type = uint16())$type)
+  expect_error(chunked_array(-1L, type = uint16())$type)
 
-  expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*Value is 
too large")
-  expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*Value is 
too large")
+  expect_error(chunked_array(65536L, type = uint16())$type)
+  expect_error(chunked_array(-1L, type = uint16())$type)
 
-  expect_error(chunked_array(bit64::as.integer64(2^31), type = int32()), 
"Invalid.*Value is too large")
-  expect_error(chunked_array(bit64::as.integer64(2^32), type = uint32()), 
"Invalid.*Value is too large")
+  expect_error(chunked_array(bit64::as.integer64(2^31), type = int32()))
+  expect_error(chunked_array(bit64::as.integer64(2^32), type = uint32()))
 })
 
 test_that("chunked_array() convert doubles to integers", {

Reply via email to