Repository: arrow
Updated Branches:
  refs/heads/master 2615b4703 -> 6e2670125


ARROW-439: [Python] Add option in "to_pandas" conversions to yield Categorical 
from String/Binary arrays

I added support to cast Arrays and Columns to dictionaries with the possibility 
to extend the casting to different types.

I intend to add more types to the casting, at least for trivial cases, but 
first I wanted to get some feedback on the current state.

Author: fjetter <florian.jet...@blue-yonder.com>
Author: Wes McKinney <wes.mckin...@twosigma.com>

Closes #909 from fjetter/feature/make_dictionary_array and squashes the 
following commits:

d1189395 [Wes McKinney] Fix deprecated API usage
606724df [Wes McKinney] Handle ordered categories in arrow_to_pandas.cc. flake8 
Cython fixes
d2bb8d8e [Wes McKinney] Move dictionary index type dispatch and memory 
allocation into CategoricalBlock::Write
6ab28730 [fjetter] Remove dead code
bea4cb9e [fjetter] Merge master
bb3209ba [fjetter] Add pool to ConvertTableToPandas in python-test
24fbf424 [fjetter] Format arrow_to_pandas
39b22ff6 [fjetter] Allocate categorical blocks in write path
b7f389f3 [fjetter] Pass memory pool from the outside
c496cb5f [fjetter] Pass pandas options through to pandas write before conversion
4b12aa13 [fjetter] Push pandas options down
b6fca35c [fjetter] Rename and add docs for EncodeDictionary
6479d292 [fjetter] add MakeDictionaryArray


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6e267012
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6e267012
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6e267012

Branch: refs/heads/master
Commit: 6e26701257be160fa95ce174d80b046adb493e57
Parents: 2615b47
Author: fjetter <florian.jet...@blue-yonder.com>
Authored: Tue Aug 8 13:51:37 2017 -0400
Committer: Wes McKinney <wes.mckin...@twosigma.com>
Committed: Tue Aug 8 13:51:37 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/builder.cc                    |  81 +++-
 cpp/src/arrow/builder.h                     |  16 +
 cpp/src/arrow/python/arrow_to_pandas.cc     | 504 ++++++++++++-----------
 cpp/src/arrow/python/arrow_to_pandas.h      |  20 +-
 cpp/src/arrow/python/python-test.cc         |   4 +-
 cpp/src/arrow/util/parallel.h               |  70 ++++
 python/pyarrow/array.pxi                    |  12 +-
 python/pyarrow/includes/libarrow.pxd        |  15 +-
 python/pyarrow/pandas_compat.py             |   7 +-
 python/pyarrow/table.pxi                    |  29 +-
 python/pyarrow/tests/test_convert_pandas.py |  29 +-
 11 files changed, 502 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 889c64d..e2054db 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -27,6 +27,7 @@
 #include "arrow/array.h"
 #include "arrow/buffer.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
@@ -1396,8 +1397,84 @@ Status MakeDictionaryBuilder(MemoryPool* pool, const 
std::shared_ptr<DataType>&
     DICTIONARY_BUILDER_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
     DICTIONARY_BUILDER_CASE(STRING, StringDictionaryBuilder);
     DICTIONARY_BUILDER_CASE(BINARY, BinaryDictionaryBuilder);
-    // DICTIONARY_BUILDER_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryBuilder);
-    // DICTIONARY_BUILDER_CASE(DECIMAL, DecimalBuilder);
+    default:
+      return Status::NotImplemented(type->ToString());
+  }
+}
+
+#define DICTIONARY_ARRAY_CASE(ENUM, BuilderType)                           \
+  case Type::ENUM:                                                         \
+    builder = std::make_shared<BuilderType>(type, pool);                   \
+    RETURN_NOT_OK(static_cast<BuilderType&>(*builder).AppendArray(input)); \
+    RETURN_NOT_OK(builder->Finish(out));                                   \
+    return Status::OK();
+
+Status EncodeArrayToDictionary(const Array& input, MemoryPool* pool,
+                               std::shared_ptr<Array>* out) {
+  const std::shared_ptr<DataType>& type = input.data()->type;
+  std::shared_ptr<ArrayBuilder> builder;
+  switch (type->id()) {
+    DICTIONARY_ARRAY_CASE(UINT8, DictionaryBuilder<UInt8Type>);
+    DICTIONARY_ARRAY_CASE(INT8, DictionaryBuilder<Int8Type>);
+    DICTIONARY_ARRAY_CASE(UINT16, DictionaryBuilder<UInt16Type>);
+    DICTIONARY_ARRAY_CASE(INT16, DictionaryBuilder<Int16Type>);
+    DICTIONARY_ARRAY_CASE(UINT32, DictionaryBuilder<UInt32Type>);
+    DICTIONARY_ARRAY_CASE(INT32, DictionaryBuilder<Int32Type>);
+    DICTIONARY_ARRAY_CASE(UINT64, DictionaryBuilder<UInt64Type>);
+    DICTIONARY_ARRAY_CASE(INT64, DictionaryBuilder<Int64Type>);
+    DICTIONARY_ARRAY_CASE(DATE32, DictionaryBuilder<Date32Type>);
+    DICTIONARY_ARRAY_CASE(DATE64, DictionaryBuilder<Date64Type>);
+    DICTIONARY_ARRAY_CASE(TIME32, DictionaryBuilder<Time32Type>);
+    DICTIONARY_ARRAY_CASE(TIME64, DictionaryBuilder<Time64Type>);
+    DICTIONARY_ARRAY_CASE(TIMESTAMP, DictionaryBuilder<TimestampType>);
+    DICTIONARY_ARRAY_CASE(FLOAT, DictionaryBuilder<FloatType>);
+    DICTIONARY_ARRAY_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
+    DICTIONARY_ARRAY_CASE(STRING, StringDictionaryBuilder);
+    DICTIONARY_ARRAY_CASE(BINARY, BinaryDictionaryBuilder);
+    default:
+      return Status::NotImplemented(type->ToString());
+  }
+}
+#define DICTIONARY_COLUMN_CASE(ENUM, BuilderType)                             \
+  case Type::ENUM:                                                            \
+    builder = std::make_shared<BuilderType>(type, pool);                      \
+    chunks = input.data();                                                    \
+    for (auto chunk : chunks->chunks()) {                                     \
+      RETURN_NOT_OK(static_cast<BuilderType&>(*builder).AppendArray(*chunk)); \
+    }                                                                         \
+    RETURN_NOT_OK(builder->Finish(&arr));                                     \
+    *out = std::make_shared<Column>(input.name(), arr);                       \
+    return Status::OK();
+
+/// \brief Encodes a column to a suitable dictionary type
+/// \param input Column to be encoded
+/// \param pool MemoryPool to allocate the dictionary
+/// \param out The new column
+/// \return Status
+Status EncodeColumnToDictionary(const Column& input, MemoryPool* pool,
+                                std::shared_ptr<Column>* out) {
+  const std::shared_ptr<DataType>& type = input.type();
+  std::shared_ptr<ArrayBuilder> builder;
+  std::shared_ptr<Array> arr;
+  std::shared_ptr<ChunkedArray> chunks;
+  switch (type->id()) {
+    DICTIONARY_COLUMN_CASE(UINT8, DictionaryBuilder<UInt8Type>);
+    DICTIONARY_COLUMN_CASE(INT8, DictionaryBuilder<Int8Type>);
+    DICTIONARY_COLUMN_CASE(UINT16, DictionaryBuilder<UInt16Type>);
+    DICTIONARY_COLUMN_CASE(INT16, DictionaryBuilder<Int16Type>);
+    DICTIONARY_COLUMN_CASE(UINT32, DictionaryBuilder<UInt32Type>);
+    DICTIONARY_COLUMN_CASE(INT32, DictionaryBuilder<Int32Type>);
+    DICTIONARY_COLUMN_CASE(UINT64, DictionaryBuilder<UInt64Type>);
+    DICTIONARY_COLUMN_CASE(INT64, DictionaryBuilder<Int64Type>);
+    DICTIONARY_COLUMN_CASE(DATE32, DictionaryBuilder<Date32Type>);
+    DICTIONARY_COLUMN_CASE(DATE64, DictionaryBuilder<Date64Type>);
+    DICTIONARY_COLUMN_CASE(TIME32, DictionaryBuilder<Time32Type>);
+    DICTIONARY_COLUMN_CASE(TIME64, DictionaryBuilder<Time64Type>);
+    DICTIONARY_COLUMN_CASE(TIMESTAMP, DictionaryBuilder<TimestampType>);
+    DICTIONARY_COLUMN_CASE(FLOAT, DictionaryBuilder<FloatType>);
+    DICTIONARY_COLUMN_CASE(DOUBLE, DictionaryBuilder<DoubleType>);
+    DICTIONARY_COLUMN_CASE(STRING, StringDictionaryBuilder);
+    DICTIONARY_COLUMN_CASE(BINARY, BinaryDictionaryBuilder);
     default:
       return Status::NotImplemented(type->ToString());
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index b15005f..46900fc 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -28,6 +28,7 @@
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
@@ -913,6 +914,21 @@ Status ARROW_EXPORT MakeDictionaryBuilder(MemoryPool* pool,
                                           const std::shared_ptr<DataType>& 
type,
                                           std::shared_ptr<ArrayBuilder>* out);
 
+/// \brief Convert Array to encoded DictionaryArray form
+///
+/// \param[in] input The Array to be encoded
+/// \param[in] pool MemoryPool to allocate memory for the hash table
+/// \param[out] out Array encoded to DictionaryArray
+Status ARROW_EXPORT EncodeArrayToDictionary(const Array& input, MemoryPool* 
pool,
+                                            std::shared_ptr<Array>* out);
+
+/// \brief Convert a Column's data internally to DictionaryArray
+///
+/// \param[in] input The ChunkedArray to be encoded
+/// \param[in] pool MemoryPool to allocate memory for the hash table
+/// \param[out] out Column with data converted to DictionaryArray
+Status ARROW_EXPORT EncodeColumnToDictionary(const Column& input, MemoryPool* 
pool,
+                                             std::shared_ptr<Column>* out);
 }  // namespace arrow
 
 #endif  // ARROW_BUILDER_H_

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/python/arrow_to_pandas.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc 
b/cpp/src/arrow/python/arrow_to_pandas.cc
index 8c769ee..23bef7b 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -22,14 +22,11 @@
 #include "arrow/python/arrow_to_pandas.h"
 
 #include <algorithm>
-#include <atomic>
 #include <cmath>
 #include <cstdint>
 #include <memory>
-#include <mutex>
 #include <sstream>
 #include <string>
-#include <thread>
 #include <unordered_map>
 #include <vector>
 
@@ -42,6 +39,7 @@
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/parallel.h"
 #include "arrow/visitor_inline.h"
 
 #include "arrow/python/builtin_convert.h"
@@ -186,8 +184,8 @@ class PandasBlock {
     CATEGORICAL
   };
 
-  PandasBlock(int64_t num_rows, int num_columns)
-      : num_rows_(num_rows), num_columns_(num_columns) {}
+  PandasBlock(PandasOptions options, int64_t num_rows, int num_columns)
+      : num_rows_(num_rows), num_columns_(num_columns), options_(options) {}
   virtual ~PandasBlock() {}
 
   virtual Status Allocate() = 0;
@@ -255,6 +253,8 @@ class PandasBlock {
   OwnedRef block_arr_;
   uint8_t* block_data_;
 
+  PandasOptions options_;
+
   // ndarray<int32>
   OwnedRef placement_arr_;
   int64_t* placement_data_;
@@ -264,7 +264,8 @@ class PandasBlock {
 };
 
 template <typename T>
-inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* 
out_values) {
+inline void ConvertIntegerWithNulls(PandasOptions options, const ChunkedArray& 
data,
+                                    double* out_values) {
   for (int c = 0; c < data.num_chunks(); c++) {
     const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
     auto in_values = reinterpret_cast<const T*>(arr.raw_values());
@@ -277,7 +278,8 @@ inline void ConvertIntegerWithNulls(const ChunkedArray& 
data, double* out_values
 }
 
 template <typename T>
-inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* 
out_values) {
+inline void ConvertIntegerNoNullsSameType(PandasOptions options, const 
ChunkedArray& data,
+                                          T* out_values) {
   for (int c = 0; c < data.num_chunks(); c++) {
     const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
     auto in_values = reinterpret_cast<const T*>(arr.raw_values());
@@ -287,7 +289,8 @@ inline void ConvertIntegerNoNullsSameType(const 
ChunkedArray& data, T* out_value
 }
 
 template <typename InType, typename OutType>
-inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* 
out_values) {
+inline void ConvertIntegerNoNullsCast(PandasOptions options, const 
ChunkedArray& data,
+                                      OutType* out_values) {
   for (int c = 0; c < data.num_chunks(); c++) {
     const auto& arr = static_cast<const PrimitiveArray&>(*data.chunk(c));
     auto in_values = reinterpret_cast<const InType*>(arr.raw_values());
@@ -297,7 +300,8 @@ inline void ConvertIntegerNoNullsCast(const ChunkedArray& 
data, OutType* out_val
   }
 }
 
-static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** 
out_values) {
+static Status ConvertBooleanWithNulls(PandasOptions options, const 
ChunkedArray& data,
+                                      PyObject** out_values) {
   PyAcquireGIL lock;
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
@@ -321,7 +325,8 @@ static Status ConvertBooleanWithNulls(const ChunkedArray& 
data, PyObject** out_v
   return Status::OK();
 }
 
-static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* 
out_values) {
+static void ConvertBooleanNoNulls(PandasOptions options, const ChunkedArray& 
data,
+                                  uint8_t* out_values) {
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto bool_arr = static_cast<BooleanArray*>(arr.get());
@@ -332,7 +337,8 @@ static void ConvertBooleanNoNulls(const ChunkedArray& data, 
uint8_t* out_values)
 }
 
 template <typename Type>
-inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** 
out_values) {
+inline Status ConvertBinaryLike(PandasOptions options, const ChunkedArray& 
data,
+                                PyObject** out_values) {
   using ArrayType = typename TypeTraits<Type>::ArrayType;
   PyAcquireGIL lock;
   for (int c = 0; c < data.num_chunks(); c++) {
@@ -362,7 +368,8 @@ inline Status ConvertBinaryLike(const ChunkedArray& data, 
PyObject** out_values)
   return Status::OK();
 }
 
-inline Status ConvertNulls(const ChunkedArray& data, PyObject** out_values) {
+inline Status ConvertNulls(PandasOptions options, const ChunkedArray& data,
+                           PyObject** out_values) {
   PyAcquireGIL lock;
   for (int c = 0; c < data.num_chunks(); c++) {
     std::shared_ptr<Array> arr = data.chunk(c);
@@ -377,7 +384,8 @@ inline Status ConvertNulls(const ChunkedArray& data, 
PyObject** out_values) {
   return Status::OK();
 }
 
-inline Status ConvertFixedSizeBinary(const ChunkedArray& data, PyObject** 
out_values) {
+inline Status ConvertFixedSizeBinary(PandasOptions options, const 
ChunkedArray& data,
+                                     PyObject** out_values) {
   PyAcquireGIL lock;
   for (int c = 0; c < data.num_chunks(); c++) {
     auto arr = static_cast<FixedSizeBinaryArray*>(data.chunk(c).get());
@@ -407,7 +415,8 @@ inline Status ConvertFixedSizeBinary(const ChunkedArray& 
data, PyObject** out_va
   return Status::OK();
 }
 
-inline Status ConvertStruct(const ChunkedArray& data, PyObject** out_values) {
+inline Status ConvertStruct(PandasOptions options, const ChunkedArray& data,
+                            PyObject** out_values) {
   PyAcquireGIL lock;
   if (data.num_chunks() <= 0) {
     return Status::OK();
@@ -424,8 +433,8 @@ inline Status ConvertStruct(const ChunkedArray& data, 
PyObject** out_values) {
     // Convert the struct arrays first
     for (int32_t i = 0; i < num_fields; i++) {
       PyObject* numpy_array;
-      RETURN_NOT_OK(
-          ConvertArrayToPandas(arr->field(static_cast<int>(i)), nullptr, 
&numpy_array));
+      RETURN_NOT_OK(ConvertArrayToPandas(options, 
arr->field(static_cast<int>(i)),
+                                         nullptr, &numpy_array));
       fields_data[i].reset(numpy_array);
     }
 
@@ -470,7 +479,7 @@ inline Status ConvertStruct(const ChunkedArray& data, 
PyObject** out_values) {
 }
 
 template <typename ArrowType>
-inline Status ConvertListsLike(const std::shared_ptr<Column>& col,
+inline Status ConvertListsLike(PandasOptions options, const 
std::shared_ptr<Column>& col,
                                PyObject** out_values) {
   const ChunkedArray& data = *col->data().get();
   auto list_type = std::static_pointer_cast<ListType>(col->type());
@@ -485,7 +494,7 @@ inline Status ConvertListsLike(const 
std::shared_ptr<Column>& col,
   // TODO(ARROW-489): Currently we don't have a Python reference for single 
columns.
   //    Storing a reference to the whole Array would be to expensive.
   PyObject* numpy_array;
-  RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array));
+  RETURN_NOT_OK(ConvertColumnToPandas(options, flat_column, nullptr, 
&numpy_array));
 
   PyAcquireGIL lock;
 
@@ -560,7 +569,8 @@ inline void ConvertDatetimeNanos(const ChunkedArray& data, 
int64_t* out_values)
 }
 
 template <typename TYPE>
-static Status ConvertTimes(const ChunkedArray& data, PyObject** out_values) {
+static Status ConvertTimes(PandasOptions options, const ChunkedArray& data,
+                           PyObject** out_values) {
   using ArrayType = typename TypeTraits<TYPE>::ArrayType;
 
   PyAcquireGIL lock;
@@ -629,7 +639,8 @@ Status RawDecimalToString(const uint8_t* bytes, int 
precision, int scale,
   return Status::OK();
 }
 
-static Status ConvertDecimals(const ChunkedArray& data, PyObject** out_values) 
{
+static Status ConvertDecimals(PandasOptions options, const ChunkedArray& data,
+                              PyObject** out_values) {
   PyAcquireGIL lock;
   OwnedRef decimal_ref;
   OwnedRef Decimal_ref;
@@ -673,9 +684,9 @@ static Status ConvertDecimals(const ChunkedArray& data, 
PyObject** out_values) {
   return Status::OK();
 }
 
-#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum)                \
-  case Type::ArrowEnum:                                            \
-    RETURN_NOT_OK((ConvertListsLike<ArrowType>(col, out_buffer))); \
+#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum)                          \
+  case Type::ArrowEnum:                                                      \
+    RETURN_NOT_OK((ConvertListsLike<ArrowType>(options_, col, out_buffer))); \
     break;
 
 class ObjectBlock : public PandasBlock {
@@ -693,21 +704,21 @@ class ObjectBlock : public PandasBlock {
     const ChunkedArray& data = *col->data().get();
 
     if (type == Type::BOOL) {
-      RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer));
+      RETURN_NOT_OK(ConvertBooleanWithNulls(options_, data, out_buffer));
     } else if (type == Type::BINARY) {
-      RETURN_NOT_OK(ConvertBinaryLike<BinaryType>(data, out_buffer));
+      RETURN_NOT_OK(ConvertBinaryLike<BinaryType>(options_, data, out_buffer));
     } else if (type == Type::STRING) {
-      RETURN_NOT_OK(ConvertBinaryLike<StringType>(data, out_buffer));
+      RETURN_NOT_OK(ConvertBinaryLike<StringType>(options_, data, out_buffer));
     } else if (type == Type::FIXED_SIZE_BINARY) {
-      RETURN_NOT_OK(ConvertFixedSizeBinary(data, out_buffer));
+      RETURN_NOT_OK(ConvertFixedSizeBinary(options_, data, out_buffer));
     } else if (type == Type::TIME32) {
-      RETURN_NOT_OK(ConvertTimes<Time32Type>(data, out_buffer));
+      RETURN_NOT_OK(ConvertTimes<Time32Type>(options_, data, out_buffer));
     } else if (type == Type::TIME64) {
-      RETURN_NOT_OK(ConvertTimes<Time64Type>(data, out_buffer));
+      RETURN_NOT_OK(ConvertTimes<Time64Type>(options_, data, out_buffer));
     } else if (type == Type::DECIMAL) {
-      RETURN_NOT_OK(ConvertDecimals(data, out_buffer));
+      RETURN_NOT_OK(ConvertDecimals(options_, data, out_buffer));
     } else if (type == Type::NA) {
-      RETURN_NOT_OK(ConvertNulls(data, out_buffer));
+      RETURN_NOT_OK(ConvertNulls(options_, data, out_buffer));
     } else if (type == Type::LIST) {
       auto list_type = std::static_pointer_cast<ListType>(col->type());
       switch (list_type->value_type()->id()) {
@@ -732,7 +743,7 @@ class ObjectBlock : public PandasBlock {
         }
       }
     } else if (type == Type::STRUCT) {
-      RETURN_NOT_OK(ConvertStruct(data, out_buffer));
+      RETURN_NOT_OK(ConvertStruct(options_, data, out_buffer));
     } else {
       std::stringstream ss;
       ss << "Unsupported type for object array output: " << 
col->type()->ToString();
@@ -768,7 +779,7 @@ class IntBlock : public PandasBlock {
       return Status::NotImplemented(ss.str());
     }
 
-    ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer);
+    ConvertIntegerNoNullsSameType<C_TYPE>(options_, data, out_buffer);
     placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
@@ -821,8 +832,8 @@ class Float64Block : public PandasBlock {
 
     const ChunkedArray& data = *col->data().get();
 
-#define INTEGER_CASE(IN_TYPE)                         \
-  ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \
+#define INTEGER_CASE(IN_TYPE)                                   \
+  ConvertIntegerWithNulls<IN_TYPE>(options_, data, out_buffer); \
   break;
 
     switch (type) {
@@ -881,7 +892,7 @@ class BoolBlock : public PandasBlock {
     uint8_t* out_buffer =
         reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_;
 
-    ConvertBooleanNoNulls(*col->data().get(), out_buffer);
+    ConvertBooleanNoNulls(options_, *col->data().get(), out_buffer);
     placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
@@ -946,8 +957,8 @@ class DatetimeBlock : public PandasBlock {
 
 class DatetimeTZBlock : public DatetimeBlock {
  public:
-  DatetimeTZBlock(const std::string& timezone, int64_t num_rows)
-      : DatetimeBlock(num_rows, 1), timezone_(timezone) {}
+  DatetimeTZBlock(PandasOptions options, const std::string& timezone, int64_t 
num_rows)
+      : DatetimeBlock(options, num_rows, 1), timezone_(timezone) {}
 
   // Like Categorical, the internal ndarray is 1-dimensional
   Status Allocate() override { return AllocateDatetime(1); }
@@ -973,25 +984,25 @@ class DatetimeTZBlock : public DatetimeBlock {
   std::string timezone_;
 };
 
-template <int ARROW_INDEX_TYPE>
 class CategoricalBlock : public PandasBlock {
  public:
-  explicit CategoricalBlock(int64_t num_rows) : PandasBlock(num_rows, 1) {}
-  Status Allocate() override {
-    constexpr int npy_type = 
internal::arrow_traits<ARROW_INDEX_TYPE>::npy_type;
+  explicit CategoricalBlock(PandasOptions options, MemoryPool* pool, int64_t 
num_rows)
+      : PandasBlock(options, num_rows, 1), pool_(pool) {}
 
-    if (!(npy_type == NPY_INT8 || npy_type == NPY_INT16 || npy_type == 
NPY_INT32 ||
-          npy_type == NPY_INT64)) {
-      return Status::Invalid("Category indices must be signed integers");
-    }
-    return AllocateNDArray(npy_type, 1);
+  Status Allocate() override {
+    return Status::NotImplemented(
+        "CategoricalBlock allocation happens when calling Write");
   }
 
-  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-               int64_t rel_placement) override {
-    using T = typename internal::arrow_traits<ARROW_INDEX_TYPE>::T;
+  template <int ARROW_INDEX_TYPE>
+  Status WriteIndices(const std::shared_ptr<Column>& col) {
+    using TRAITS = internal::arrow_traits<ARROW_INDEX_TYPE>;
+    using T = typename TRAITS::T;
+    constexpr int npy_type = TRAITS::npy_type;
+    RETURN_NOT_OK(AllocateNDArray(npy_type, 1));
 
-    T* out_values = reinterpret_cast<T*>(block_data_) + rel_placement * 
num_rows_;
+    // No relative placement offset because a single column
+    T* out_values = reinterpret_cast<T*>(block_data_);
 
     const ChunkedArray& data = *col->data().get();
 
@@ -1008,13 +1019,48 @@ class CategoricalBlock : public PandasBlock {
       }
     }
 
-    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
+  }
+
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+               int64_t rel_placement) override {
+    std::shared_ptr<Column> converted_col;
+    if (options_.strings_to_categorical &&
+        (col->type()->id() == Type::STRING || col->type()->id() == 
Type::BINARY)) {
+      RETURN_NOT_OK(EncodeColumnToDictionary(static_cast<const Column&>(*col), 
pool_,
+                                             &converted_col));
+    } else {
+      converted_col = col;
+    }
 
-    auto dict_type = static_cast<const DictionaryType*>(col->type().get());
+    const auto& dict_type = static_cast<const 
DictionaryType&>(*converted_col->type());
+
+    switch (dict_type.index_type()->id()) {
+      case Type::INT8:
+        RETURN_NOT_OK(WriteIndices<Type::INT8>(converted_col));
+        break;
+      case Type::INT16:
+        RETURN_NOT_OK(WriteIndices<Type::INT16>(converted_col));
+        break;
+      case Type::INT32:
+        RETURN_NOT_OK(WriteIndices<Type::INT32>(converted_col));
+        break;
+      case Type::INT64:
+        RETURN_NOT_OK(WriteIndices<Type::INT64>(converted_col));
+        break;
+      default: {
+        std::stringstream ss;
+        ss << "Categorical index type not supported: "
+           << dict_type.index_type()->ToString();
+        return Status::NotImplemented(ss.str());
+      }
+    }
 
+    placement_data_[rel_placement] = abs_placement;
     PyObject* dict;
-    RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, 
&dict));
+    RETURN_NOT_OK(ConvertArrayToPandas(options_, dict_type.dictionary(), 
nullptr, &dict));
     dictionary_.reset(dict);
+    ordered_ = dict_type.ordered();
 
     return Status::OK();
   }
@@ -1027,20 +1073,26 @@ class CategoricalBlock : public PandasBlock {
     PyDict_SetItemString(result, "dictionary", dictionary_.obj());
     PyDict_SetItemString(result, "placement", placement_arr_.obj());
 
+    PyObject* py_ordered = ordered_ ? Py_True : Py_False;
+    Py_INCREF(py_ordered);
+    PyDict_SetItemString(result, "ordered", py_ordered);
+
     *output = result;
 
     return Status::OK();
   }
 
  protected:
+  MemoryPool* pool_;
   OwnedRef dictionary_;
+  bool ordered_;
 };
 
-Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns,
-                 std::shared_ptr<PandasBlock>* block) {
-#define BLOCK_CASE(NAME, TYPE)                              \
-  case PandasBlock::NAME:                                   \
-    *block = std::make_shared<TYPE>(num_rows, num_columns); \
+Status MakeBlock(PandasOptions options, PandasBlock::type type, int64_t 
num_rows,
+                 int num_columns, std::shared_ptr<PandasBlock>* block) {
+#define BLOCK_CASE(NAME, TYPE)                                       \
+  case PandasBlock::NAME:                                            \
+    *block = std::make_shared<TYPE>(options, num_rows, num_columns); \
     break;
 
   switch (type) {
@@ -1066,36 +1118,94 @@ Status MakeBlock(PandasBlock::type type, int64_t 
num_rows, int num_columns,
   return (*block)->Allocate();
 }
 
-static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& 
type,
-                                          int64_t num_rows,
-                                          std::shared_ptr<PandasBlock>* block) 
{
-  // All categoricals become a block with a single column
-  auto dict_type = static_cast<const DictionaryType*>(type.get());
-  switch (dict_type->index_type()->id()) {
+using BlockMap = std::unordered_map<int, std::shared_ptr<PandasBlock>>;
+
+static Status GetPandasBlockType(const Column& col, const PandasOptions& 
options,
+                                 PandasBlock::type* output_type) {
+  switch (col.type()->id()) {
+    case Type::BOOL:
+      *output_type = col.null_count() > 0 ? PandasBlock::OBJECT : 
PandasBlock::BOOL;
+      break;
+    case Type::UINT8:
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT8;
+      break;
     case Type::INT8:
-      *block = std::make_shared<CategoricalBlock<Type::INT8>>(num_rows);
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT8;
+      break;
+    case Type::UINT16:
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT16;
       break;
     case Type::INT16:
-      *block = std::make_shared<CategoricalBlock<Type::INT16>>(num_rows);
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT16;
+      break;
+    case Type::UINT32:
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT32;
       break;
     case Type::INT32:
-      *block = std::make_shared<CategoricalBlock<Type::INT32>>(num_rows);
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT32;
       break;
     case Type::INT64:
-      *block = std::make_shared<CategoricalBlock<Type::INT64>>(num_rows);
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT64;
+      break;
+    case Type::UINT64:
+      *output_type = col.null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT64;
+      break;
+    case Type::FLOAT:
+      *output_type = PandasBlock::FLOAT;
+      break;
+    case Type::DOUBLE:
+      *output_type = PandasBlock::DOUBLE;
+      break;
+    case Type::STRING:
+    case Type::BINARY:
+      if (options.strings_to_categorical) {
+        *output_type = PandasBlock::CATEGORICAL;
+        break;
+      }
+    case Type::NA:
+    case Type::FIXED_SIZE_BINARY:
+    case Type::STRUCT:
+    case Type::TIME32:
+    case Type::TIME64:
+    case Type::DECIMAL:
+      *output_type = PandasBlock::OBJECT;
+      break;
+    case Type::DATE32:
+      *output_type = PandasBlock::DATETIME;
+      break;
+    case Type::DATE64:
+      *output_type = PandasBlock::DATETIME;
+      break;
+    case Type::TIMESTAMP: {
+      const auto& ts_type = static_cast<const TimestampType&>(*col.type());
+      if (ts_type.timezone() != "") {
+        *output_type = PandasBlock::DATETIME_WITH_TZ;
+      } else {
+        *output_type = PandasBlock::DATETIME;
+      }
+    } break;
+    case Type::LIST: {
+      auto list_type = std::static_pointer_cast<ListType>(col.type());
+      if (!ListTypeSupported(*list_type->value_type())) {
+        std::stringstream ss;
+        ss << "Not implemented type for list in DataFrameBlock: "
+           << list_type->value_type()->ToString();
+        return Status::NotImplemented(ss.str());
+      }
+      *output_type = PandasBlock::OBJECT;
+    } break;
+    case Type::DICTIONARY:
+      *output_type = PandasBlock::CATEGORICAL;
       break;
-    default: {
+    default:
       std::stringstream ss;
-      ss << "Categorical index type not implemented: "
-         << dict_type->index_type()->ToString();
+      ss << "No known equivalent Pandas block for Arrow data of type ";
+      ss << col.type()->ToString() << " is known.";
       return Status::NotImplemented(ss.str());
-    }
   }
-  return (*block)->Allocate();
+  return Status::OK();
 }
 
-using BlockMap = std::unordered_map<int, std::shared_ptr<PandasBlock>>;
-
 // Construct the exact pandas 0.x "BlockManager" memory layout
 //
 // * For each column determine the correct output pandas type
@@ -1105,7 +1215,9 @@ using BlockMap = std::unordered_map<int, 
std::shared_ptr<PandasBlock>>;
 // * placement arrays as we go
 class DataFrameBlockCreator {
  public:
-  explicit DataFrameBlockCreator(const std::shared_ptr<Table>& table) : 
table_(table) {}
+  explicit DataFrameBlockCreator(const PandasOptions& options,
+                                 const std::shared_ptr<Table>& table, 
MemoryPool* pool)
+      : table_(table), options_(options), pool_(pool) {}
 
   Status Convert(int nthreads, PyObject** output) {
     column_types_.resize(table_->num_columns());
@@ -1123,94 +1235,17 @@ class DataFrameBlockCreator {
     for (int i = 0; i < table_->num_columns(); ++i) {
       std::shared_ptr<Column> col = table_->column(i);
       PandasBlock::type output_type;
-
-      Type::type column_type = col->type()->id();
-      switch (column_type) {
-        case Type::BOOL:
-          output_type = col->null_count() > 0 ? PandasBlock::OBJECT : 
PandasBlock::BOOL;
-          break;
-        case Type::UINT8:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT8;
-          break;
-        case Type::INT8:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT8;
-          break;
-        case Type::UINT16:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT16;
-          break;
-        case Type::INT16:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT16;
-          break;
-        case Type::UINT32:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT32;
-          break;
-        case Type::INT32:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT32;
-          break;
-        case Type::INT64:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::INT64;
-          break;
-        case Type::UINT64:
-          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : 
PandasBlock::UINT64;
-          break;
-        case Type::FLOAT:
-          output_type = PandasBlock::FLOAT;
-          break;
-        case Type::DOUBLE:
-          output_type = PandasBlock::DOUBLE;
-          break;
-        case Type::NA:
-        case Type::STRING:
-        case Type::BINARY:
-        case Type::FIXED_SIZE_BINARY:
-        case Type::STRUCT:
-        case Type::TIME32:
-        case Type::TIME64:
-        case Type::DECIMAL:
-          output_type = PandasBlock::OBJECT;
-          break;
-        case Type::DATE32:
-          output_type = PandasBlock::DATETIME;
-          break;
-        case Type::DATE64:
-          output_type = PandasBlock::DATETIME;
-          break;
-        case Type::TIMESTAMP: {
-          const auto& ts_type = static_cast<const 
TimestampType&>(*col->type());
-          if (ts_type.timezone() != "") {
-            output_type = PandasBlock::DATETIME_WITH_TZ;
-          } else {
-            output_type = PandasBlock::DATETIME;
-          }
-        } break;
-        case Type::LIST: {
-          auto list_type = std::static_pointer_cast<ListType>(col->type());
-          if (!ListTypeSupported(*list_type->value_type())) {
-            std::stringstream ss;
-            ss << "Not implemented type for list in DataFrameBlock: "
-               << list_type->value_type()->ToString();
-            return Status::NotImplemented(ss.str());
-          }
-          output_type = PandasBlock::OBJECT;
-        } break;
-        case Type::DICTIONARY:
-          output_type = PandasBlock::CATEGORICAL;
-          break;
-        default:
-          std::stringstream ss;
-          ss << "No known equivalent Pandas block for Arrow data of type ";
-          ss << col->type()->ToString() << " is known.";
-          return Status::NotImplemented(ss.str());
-      }
+      RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type));
 
       int block_placement = 0;
       std::shared_ptr<PandasBlock> block;
       if (output_type == PandasBlock::CATEGORICAL) {
-        RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), 
&block));
+        block = std::make_shared<CategoricalBlock>(options_, pool_, 
table_->num_rows());
         categorical_blocks_[i] = block;
       } else if (output_type == PandasBlock::DATETIME_WITH_TZ) {
         const auto& ts_type = static_cast<const TimestampType&>(*col->type());
-        block = std::make_shared<DatetimeTZBlock>(ts_type.timezone(), 
table_->num_rows());
+        block = std::make_shared<DatetimeTZBlock>(options_, ts_type.timezone(),
+                                                  table_->num_rows());
         RETURN_NOT_OK(block->Allocate());
         datetimetz_blocks_[i] = block;
       } else {
@@ -1224,92 +1259,61 @@ class DataFrameBlockCreator {
           type_counts_[output_type] = 1;
         }
       }
-
       column_types_[i] = output_type;
       column_block_placement_[i] = block_placement;
     }
 
     // Create normal non-categorical blocks
-    for (const auto& it : type_counts_) {
+    for (const auto& it : this->type_counts_) {
       PandasBlock::type type = static_cast<PandasBlock::type>(it.first);
       std::shared_ptr<PandasBlock> block;
-      RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block));
-      blocks_[type] = block;
+      RETURN_NOT_OK(
+          MakeBlock(this->options_, type, this->table_->num_rows(), it.second, 
&block));
+      this->blocks_[type] = block;
     }
     return Status::OK();
   }
 
-  Status WriteTableToBlocks(int nthreads) {
-    auto WriteColumn = [this](int i) {
-      std::shared_ptr<Column> col = this->table_->column(i);
-      PandasBlock::type output_type = this->column_types_[i];
+  Status GetBlock(int i, std::shared_ptr<PandasBlock>* block) {
+    PandasBlock::type output_type = this->column_types_[i];
 
-      int rel_placement = this->column_block_placement_[i];
+    if (output_type == PandasBlock::CATEGORICAL) {
+      auto it = this->categorical_blocks_.find(i);
+      if (it == this->blocks_.end()) {
+        return Status::KeyError("No categorical block allocated");
+      }
+      *block = it->second;
+    } else if (output_type == PandasBlock::DATETIME_WITH_TZ) {
+      auto it = this->datetimetz_blocks_.find(i);
+      if (it == this->datetimetz_blocks_.end()) {
+        return Status::KeyError("No datetimetz block allocated");
+      }
+      *block = it->second;
+    } else {
+      auto it = this->blocks_.find(output_type);
+      if (it == this->blocks_.end()) {
+        return Status::KeyError("No block allocated");
+      }
+      *block = it->second;
+    }
+    return Status::OK();
+  }
 
+  Status WriteTableToBlocks(int nthreads) {
+    auto WriteColumn = [this](int i) {
       std::shared_ptr<PandasBlock> block;
-      if (output_type == PandasBlock::CATEGORICAL) {
-        auto it = this->categorical_blocks_.find(i);
-        if (it == this->blocks_.end()) {
-          return Status::KeyError("No categorical block allocated");
-        }
-        block = it->second;
-      } else if (output_type == PandasBlock::DATETIME_WITH_TZ) {
-        auto it = this->datetimetz_blocks_.find(i);
-        if (it == this->datetimetz_blocks_.end()) {
-          return Status::KeyError("No datetimetz block allocated");
-        }
-        block = it->second;
-      } else {
-        auto it = this->blocks_.find(output_type);
-        if (it == this->blocks_.end()) {
-          return Status::KeyError("No block allocated");
-        }
-        block = it->second;
-      }
-      return block->Write(col, i, rel_placement);
+      RETURN_NOT_OK(this->GetBlock(i, &block));
+      return block->Write(this->table_->column(i), i, 
this->column_block_placement_[i]);
     };
 
-    nthreads = std::min<int>(nthreads, table_->num_columns());
-
+    int num_tasks = table_->num_columns();
+    nthreads = std::min<int>(nthreads, num_tasks);
     if (nthreads == 1) {
-      for (int i = 0; i < table_->num_columns(); ++i) {
+      for (int i = 0; i < num_tasks; ++i) {
         RETURN_NOT_OK(WriteColumn(i));
       }
     } else {
-      std::vector<std::thread> thread_pool;
-      thread_pool.reserve(nthreads);
-      std::atomic<int> task_counter(0);
-
-      std::mutex error_mtx;
-      bool error_occurred = false;
-      Status error;
-
-      for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
-        thread_pool.emplace_back(
-            [this, &error, &error_occurred, &error_mtx, &task_counter, 
&WriteColumn]() {
-              int column_num;
-              while (!error_occurred) {
-                column_num = task_counter.fetch_add(1);
-                if (column_num >= this->table_->num_columns()) {
-                  break;
-                }
-                Status s = WriteColumn(column_num);
-                if (!s.ok()) {
-                  std::lock_guard<std::mutex> lock(error_mtx);
-                  error_occurred = true;
-                  error = s;
-                  break;
-                }
-              }
-            });
-      }
-      for (auto&& thread : thread_pool) {
-        thread.join();
-      }
-
-      if (error_occurred) {
-        return error;
-      }
+      RETURN_NOT_OK(ParallelFor(nthreads, num_tasks, WriteColumn));
     }
     return Status::OK();
   }
@@ -1354,6 +1358,11 @@ class DataFrameBlockCreator {
   // block type -> type count
   std::unordered_map<int, int> type_counts_;
 
+  PandasOptions options_;
+
+  // Memory pool for dictionary encoding
+  MemoryPool* pool_;
+
   // block type -> block
   BlockMap blocks_;
 
@@ -1366,8 +1375,9 @@ class DataFrameBlockCreator {
 
 class ArrowDeserializer {
  public:
-  ArrowDeserializer(const std::shared_ptr<Column>& col, PyObject* py_ref)
-      : col_(col), data_(*col->data().get()), py_ref_(py_ref) {}
+  ArrowDeserializer(PandasOptions options, const std::shared_ptr<Column>& col,
+                    PyObject* py_ref)
+      : col_(col), data_(*col->data().get()), options_(options), 
py_ref_(py_ref) {}
 
   Status AllocateOutput(int type) {
     PyAcquireGIL lock;
@@ -1378,7 +1388,8 @@ class ArrowDeserializer {
   }
 
   template <int TYPE>
-  Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr<Array> arr) {
+  Status ConvertValuesZeroCopy(PandasOptions options, int npy_type,
+                               std::shared_ptr<Array> arr) {
     typedef typename internal::arrow_traits<TYPE>::T T;
 
     const auto& prim_arr = static_cast<const PrimitiveArray&>(*arr);
@@ -1429,7 +1440,7 @@ class ArrowDeserializer {
     int npy_type = traits::npy_type;
 
     if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != 
nullptr) {
-      return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0));
+      return ConvertValuesZeroCopy<TYPE>(options_, npy_type, data_.chunk(0));
     }
 
     RETURN_NOT_OK(AllocateOutput(npy_type));
@@ -1482,17 +1493,17 @@ class ArrowDeserializer {
     typedef typename traits::T T;
 
     if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != 
nullptr) {
-      return ConvertValuesZeroCopy<TYPE>(traits::npy_type, data_.chunk(0));
+      return ConvertValuesZeroCopy<TYPE>(options_, traits::npy_type, 
data_.chunk(0));
     }
 
     if (data_.null_count() > 0) {
       RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64));
       auto out_values = reinterpret_cast<double*>(PyArray_DATA(arr_));
-      ConvertIntegerWithNulls<T>(data_, out_values);
+      ConvertIntegerWithNulls<T>(options_, data_, out_values);
     } else {
       RETURN_NOT_OK(AllocateOutput(traits::npy_type));
       auto out_values = reinterpret_cast<T*>(PyArray_DATA(arr_));
-      ConvertIntegerNoNullsSameType<T>(data_, out_values);
+      ConvertIntegerNoNullsSameType<T>(options_, data_, out_values);
     }
 
     return Status::OK();
@@ -1502,7 +1513,7 @@ class ArrowDeserializer {
   inline Status VisitObjects(FUNCTOR func) {
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
     auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-    return func(data_, out_values);
+    return func(options_, data_, out_values);
   }
 
   // UTF8 strings
@@ -1534,7 +1545,7 @@ class ArrowDeserializer {
     } else {
       
RETURN_NOT_OK(AllocateOutput(internal::arrow_traits<Type::BOOL>::npy_type));
       auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(arr_));
-      ConvertBooleanNoNulls(data_, out_values);
+      ConvertBooleanNoNulls(options_, data_, out_values);
     }
     return Status::OK();
   }
@@ -1542,7 +1553,7 @@ class ArrowDeserializer {
   Status Visit(const ListType& type) {
 #define CONVERTVALUES_LISTSLIKE_CASE(ArrowType, ArrowEnum) \
   case Type::ArrowEnum:                                    \
-    return ConvertListsLike<ArrowType>(col_, out_values);
+    return ConvertListsLike<ArrowType>(options_, col_, out_values);
 
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
     auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
@@ -1572,8 +1583,7 @@ class ArrowDeserializer {
   }
 
   Status Visit(const DictionaryType& type) {
-    std::shared_ptr<PandasBlock> block;
-    RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block));
+    auto block = std::make_shared<CategoricalBlock>(options_, nullptr, 
col_->length());
     RETURN_NOT_OK(block->Write(col_, 0, 0));
 
     auto dict_type = static_cast<const DictionaryType*>(col_->type().get());
@@ -1587,7 +1597,8 @@ class ArrowDeserializer {
     // Release GIL before calling ConvertArrayToPandas, will be reacquired
     // there if needed
     lock.release();
-    RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, 
&dictionary));
+    RETURN_NOT_OK(
+        ConvertArrayToPandas(options_, dict_type->dictionary(), nullptr, 
&dictionary));
     lock.acquire();
 
     PyDict_SetItemString(result_, "indices", block->block_arr());
@@ -1607,28 +1618,29 @@ class ArrowDeserializer {
  private:
   std::shared_ptr<Column> col_;
   const ChunkedArray& data_;
+  PandasOptions options_;
   PyObject* py_ref_;
   PyArrayObject* arr_;
   PyObject* result_;
 };
 
-Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* 
py_ref,
-                            PyObject** out) {
+Status ConvertArrayToPandas(PandasOptions options, const 
std::shared_ptr<Array>& arr,
+                            PyObject* py_ref, PyObject** out) {
   static std::string dummy_name = "dummy";
   auto field = std::make_shared<Field>(dummy_name, arr->type());
   auto col = std::make_shared<Column>(field, arr);
-  return ConvertColumnToPandas(col, py_ref, out);
+  return ConvertColumnToPandas(options, col, py_ref, out);
 }
 
-Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* 
py_ref,
-                             PyObject** out) {
-  ArrowDeserializer converter(col, py_ref);
+Status ConvertColumnToPandas(PandasOptions options, const 
std::shared_ptr<Column>& col,
+                             PyObject* py_ref, PyObject** out) {
+  ArrowDeserializer converter(options, col, py_ref);
   return converter.Convert(out);
 }
 
-Status ConvertTableToPandas(const std::shared_ptr<Table>& table, int nthreads,
-                            PyObject** out) {
-  DataFrameBlockCreator helper(table);
+Status ConvertTableToPandas(PandasOptions options, const 
std::shared_ptr<Table>& table,
+                            int nthreads, MemoryPool* pool, PyObject** out) {
+  DataFrameBlockCreator helper(options, table, pool);
   return helper.Convert(nthreads, out);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/python/arrow_to_pandas.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_pandas.h 
b/cpp/src/arrow/python/arrow_to_pandas.h
index 5a99274..1d716a5 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.h
+++ b/cpp/src/arrow/python/arrow_to_pandas.h
@@ -39,18 +39,18 @@ class Table;
 
 namespace py {
 
-ARROW_EXPORT
-Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* 
py_ref,
-                            PyObject** out);
-
-ARROW_EXPORT
-Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* 
py_ref,
-                             PyObject** out);
-
 struct PandasOptions {
   bool strings_to_categorical;
 };
 
+ARROW_EXPORT
+Status ConvertArrayToPandas(PandasOptions options, const 
std::shared_ptr<Array>& arr,
+                            PyObject* py_ref, PyObject** out);
+
+ARROW_EXPORT
+Status ConvertColumnToPandas(PandasOptions options, const 
std::shared_ptr<Column>& col,
+                             PyObject* py_ref, PyObject** out);
+
 // Convert a whole table as efficiently as possible to a pandas.DataFrame.
 //
 // The returned Python object is a list of tuples consisting of the exact 2D
@@ -58,8 +58,8 @@ struct PandasOptions {
 //
 // tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2])
 ARROW_EXPORT
-Status ConvertTableToPandas(const std::shared_ptr<Table>& table, int nthreads,
-                            PyObject** out);
+Status ConvertTableToPandas(PandasOptions options, const 
std::shared_ptr<Table>& table,
+                            int nthreads, MemoryPool* pool, PyObject** out);
 
 }  // namespace py
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/python/python-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python-test.cc 
b/cpp/src/arrow/python/python-test.cc
index dd95646..0d83012 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -92,7 +92,9 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
 
   PyObject* out;
   Py_BEGIN_ALLOW_THREADS;
-  ASSERT_RAISES(UnknownError, ConvertTableToPandas(table, 2, &out));
+  PandasOptions options;
+  MemoryPool* pool = default_memory_pool();
+  ASSERT_RAISES(UnknownError, ConvertTableToPandas(options, table, 2, pool, 
&out));
   Py_END_ALLOW_THREADS;
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/cpp/src/arrow/util/parallel.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
new file mode 100644
index 0000000..9fec000
--- /dev/null
+++ b/cpp/src/arrow/util/parallel.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_PARALLEL_H
+#define ARROW_UTIL_PARALLEL_H
+
+#include <atomic>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "arrow/status.h"
+
+namespace arrow {
+
+template <class FUNCTION>
+Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
+  std::vector<std::thread> thread_pool;
+  thread_pool.reserve(nthreads);
+  std::atomic<int> task_counter(0);
+
+  std::mutex error_mtx;
+  bool error_occurred = false;
+  Status error;
+
+  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+    thread_pool.emplace_back(
+        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, 
&func]() {
+          int task_id;
+          while (!error_occurred) {
+            task_id = task_counter.fetch_add(1);
+            if (task_id >= num_tasks) {
+              break;
+            }
+            Status s = func(task_id);
+            if (!s.ok()) {
+              std::lock_guard<std::mutex> lock(error_mtx);
+              error_occurred = true;
+              error = s;
+              break;
+            }
+          }
+        });
+  }
+  for (auto&& thread : thread_pool) {
+    thread.join();
+  }
+  if (error_occurred) {
+    return error;
+  }
+  return Status::OK();
+}
+
+}  // namespace arrow
+
+#endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/python/pyarrow/array.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index c0c7ac6..20e778d 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -274,10 +274,15 @@ cdef class Array:
 
         return pyarrow_wrap_array(result)
 
-    def to_pandas(self):
+    def to_pandas(self, c_bool strings_to_categorical=False):
         """
         Convert to an array object suitable for use in pandas
 
+        Parameters
+        ----------
+        strings_to_categorical : boolean, default False
+            Encode string (UTF8) and binary types to pandas.Categorical
+
         See also
         --------
         Column.to_pandas
@@ -286,9 +291,12 @@ cdef class Array:
         """
         cdef:
             PyObject* out
+            PandasOptions options
 
+        options = PandasOptions(strings_to_categorical=strings_to_categorical)
         with nogil:
-            check_status(ConvertArrayToPandas(self.sp_array, self, &out))
+            check_status(ConvertArrayToPandas(options, self.sp_array,
+                                              self, &out))
         return wrap_array_output(out)
 
     def to_pylist(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 3ea4873..eed9640 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -740,14 +740,18 @@ cdef extern from "arrow/python/api.h" namespace 
"arrow::py" nogil:
     CStatus TensorToNdarray(const CTensor& tensor, object base,
                             PyObject** out)
 
-    CStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
+    CStatus ConvertArrayToPandas(PandasOptions options,
+                                 const shared_ptr[CArray]& arr,
                                  object py_ref, PyObject** out)
 
-    CStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
+    CStatus ConvertColumnToPandas(PandasOptions options,
+                                  const shared_ptr[CColumn]& arr,
                                   object py_ref, PyObject** out)
 
-    CStatus ConvertTableToPandas(const shared_ptr[CTable]& table,
-                                 int nthreads, PyObject** out)
+    CStatus ConvertTableToPandas(PandasOptions options,
+                                 const shared_ptr[CTable]& table,
+                                 int nthreads, CMemoryPool* pool,
+                                 PyObject** out)
 
     void c_set_default_memory_pool \
         " arrow::py::set_default_memory_pool"(CMemoryPool* pool)\
@@ -767,6 +771,9 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" 
nogil:
     cdef cppclass PyBytesReader(CBufferReader):
         PyBytesReader(object fo)
 
+    cdef struct PandasOptions:
+        c_bool strings_to_categorical
+
 
 cdef extern from 'arrow/python/init.h':
     int arrow_init_numpy() except -1

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index ddd5622..434b1c9 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -269,7 +269,7 @@ def maybe_coerce_datetime64(values, dtype, type_, 
timestamps_to_ms=False):
     return values, type_
 
 
-def table_to_blockmanager(table, nthreads=1):
+def table_to_blockmanager(options, table, memory_pool, nthreads=1):
     import pandas.core.internals as _int
     from pyarrow.compat import DatetimeTZDtype
     import pyarrow.lib as lib
@@ -305,17 +305,16 @@ def table_to_blockmanager(table, nthreads=1):
                 block_table.schema.get_field_index(name)
             )
 
-    result = lib.table_to_blocks(block_table, nthreads)
+    result = lib.table_to_blocks(options, block_table, nthreads, memory_pool)
 
     blocks = []
     for item in result:
         block_arr = item['block']
         placement = item['placement']
         if 'dictionary' in item:
-            ordered = block_table.schema[placement[0]].type.ordered
             cat = pd.Categorical(block_arr,
                                  categories=item['dictionary'],
-                                 ordered=ordered, fastpath=True)
+                                 ordered=item['ordered'], fastpath=True)
             block = _int.make_block(cat, placement=placement,
                                     klass=_int.CategoricalBlock,
                                     fastpath=True)

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index b9b0899..976f429 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -157,7 +157,7 @@ cdef class Column:
         sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
         return pyarrow_wrap_column(sp_column)
 
-    def to_pandas(self):
+    def to_pandas(self, strings_to_categorical=False):
         """
         Convert the arrow::Column to a pandas.Series
 
@@ -167,9 +167,13 @@ cdef class Column:
         """
         cdef:
             PyObject* out
+            PandasOptions options
+
+        options = PandasOptions(strings_to_categorical=strings_to_categorical)
 
         with nogil:
-            check_status(libarrow.ConvertColumnToPandas(self.sp_column,
+            check_status(libarrow.ConvertColumnToPandas(options,
+                                                        self.sp_column,
                                                         self, &out))
 
         return pd.Series(wrap_array_output(out), name=self.name)
@@ -580,15 +584,18 @@ cdef class RecordBatch:
         return pyarrow_wrap_batch(batch)
 
 
-def table_to_blocks(Table table, int nthreads):
+def table_to_blocks(PandasOptions options, Table table, int nthreads,
+                    MemoryPool memory_pool):
     cdef:
         PyObject* result_obj
         shared_ptr[CTable] c_table = table.sp_table
+        CMemoryPool* pool
 
+    pool = maybe_unbox_memory_pool(memory_pool)
     with nogil:
         check_status(
             libarrow.ConvertTableToPandas(
-                c_table, nthreads, &result_obj
+                options, c_table, nthreads, pool, &result_obj
             )
         )
 
@@ -790,7 +797,8 @@ cdef class Table:
 
         return pyarrow_wrap_table(c_table)
 
-    def to_pandas(self, nthreads=None):
+    def to_pandas(self, nthreads=None, strings_to_categorical=False,
+                  memory_pool=None):
         """
         Convert the arrow::Table to a pandas DataFrame
 
@@ -800,16 +808,23 @@ cdef class Table:
             For the default, we divide the CPU count by 2 because most modern
             computers have hyperthreading turned on, so doubling the CPU count
             beyond the number of physical cores does not help
+        strings_to_categorical : boolean, default False
+            Encode string (UTF8) and binary types to pandas.Categorical
+        memory_pool: MemoryPool, optional
+            Specific memory pool to use to allocate casted columns
 
         Returns
         -------
         pandas.DataFrame
         """
+        cdef:
+            PandasOptions options
+        options = PandasOptions(strings_to_categorical=strings_to_categorical)
         self._check_nullptr()
         if nthreads is None:
             nthreads = cpu_count()
-
-        mgr = pdcompat.table_to_blockmanager(self, nthreads)
+        mgr = pdcompat.table_to_blockmanager(options, self, memory_pool,
+                                             nthreads)
         return pd.DataFrame(mgr)
 
     def to_pydict(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/6e267012/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py 
b/python/pyarrow/tests/test_convert_pandas.py
index 93058fb..8969777 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -327,7 +327,7 @@ class TestPandasConversion(unittest.TestCase):
                 '2006-01-13T12:34:56.432',
                 '2010-08-13T05:46:57.437'],
                 dtype='datetime64[ms]')
-            })
+        })
         field = pa.field('datetime64', pa.timestamp('ms'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
@@ -342,7 +342,7 @@ class TestPandasConversion(unittest.TestCase):
                 '2006-01-13T12:34:56.432539784',
                 '2010-08-13T05:46:57.437699912'],
                 dtype='datetime64[ns]')
-            })
+        })
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
@@ -369,7 +369,7 @@ class TestPandasConversion(unittest.TestCase):
                 None,
                 '2010-08-13T05:46:57.437'],
                 dtype='datetime64[ms]')
-            })
+        })
         field = pa.field('datetime64', pa.timestamp('ms'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
@@ -384,7 +384,7 @@ class TestPandasConversion(unittest.TestCase):
                 None,
                 '2010-08-13T05:46:57.437699912'],
                 dtype='datetime64[ns]')
-            })
+        })
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
         self._check_pandas_roundtrip(
@@ -400,7 +400,7 @@ class TestPandasConversion(unittest.TestCase):
                 '2006-01-13T12:34:56.432',
                 '2010-08-13T05:46:57.437'],
                 dtype='datetime64[ms]')
-            })
+        })
         df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
                             .to_frame())
         self._check_pandas_roundtrip(df, timestamps_to_ms=True)
@@ -413,7 +413,7 @@ class TestPandasConversion(unittest.TestCase):
                 '2006-01-13T12:34:56.432539784',
                 '2010-08-13T05:46:57.437699912'],
                 dtype='datetime64[ns]')
-            })
+        })
         df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
                             .to_frame())
         self._check_pandas_roundtrip(df, timestamps_to_ms=False)
@@ -462,7 +462,7 @@ class TestPandasConversion(unittest.TestCase):
         table_pandas = table.to_pandas()
 
         ex_values = (np.array(['2017-04-03', '2017-04-04', '2017-04-04',
-                              '2017-04-05'],
+                               '2017-04-05'],
                               dtype='datetime64[D]')
                      .astype('datetime64[ns]'))
         ex_values[1] = pd.NaT.value
@@ -491,10 +491,10 @@ class TestPandasConversion(unittest.TestCase):
         # TODO(jreback): Pandas only support ns resolution
         # Arrow supports ??? for resolution
         df = pd.DataFrame({
-            'timedelta': np.arange(start=0, stop=3*86400000,
+            'timedelta': np.arange(start=0, stop=3 * 86400000,
                                    step=86400000,
                                    dtype='timedelta64[ms]')
-            })
+        })
         pa.Table.from_pandas(df)
 
     def test_column_of_arrays(self):
@@ -920,6 +920,17 @@ class TestPandasConversion(unittest.TestCase):
         assert data_column['numpy_type'] == 'object'
         assert data_column['metadata'] == {'precision': 26, 'scale': 11}
 
+    def test_table_str_to_categorical(self):
+        values = [None, 'a', 'b', np.nan]
+        df = pd.DataFrame({'strings': values})
+        field = pa.field('strings', pa.string())
+        schema = pa.schema([field])
+        table = pa.Table.from_pandas(df, schema=schema)
+
+        result = table.to_pandas(strings_to_categorical=True)
+        expected = pd.DataFrame({'strings': pd.Categorical(values)})
+        tm.assert_frame_equal(result, expected, check_dtype=True)
+
 
 def _pytime_from_micros(val):
     microseconds = val % 1000000

Reply via email to