Repository: arrow Updated Branches: refs/heads/master 1079a3206 -> ab5f66a2e
ARROW-428: [Python] Multithreaded conversion from Arrow table to pandas.DataFrame This yields a substantial speedup on my laptop. On a 1GB numeric dataset, with 1 thread (the default prior to this patch): ``` >>> %timeit df2 = table.to_pandas(nthreads=1) 1 loop, best of 3: 498 ms per loop ``` With 4 threads (this is a true quad-core machine) ``` >>> %timeit df2 = table.to_pandas(nthreads=4) 1 loop, best of 3: 151 ms per loop ``` The default number of cores used is the `os.cpu_count` divided by 2 (since hyperthreading doesn't help with this largely memory-bound operation). Author: Wes McKinney <[email protected]> Closes #252 from wesm/ARROW-428 and squashes the following commits: da929bf [Wes McKinney] Factor out common compiler flag code between Arrow C++ and Python CMake files. Add pyarrow.cpu_count/set_cpu_count functions per feedback cad89e9 [Wes McKinney] Tweak pyarrow cmake flags e70f16d [Wes McKinney] Add missing GIL acquisition. Do not spawn too many threads if few columns bc4dff7 [Wes McKinney] Return errors from threaded conversion. Add doc about number of cpus used 79f5fd9 [Wes McKinney] Implement multithreaded conversion from Arrow table to pandas.DataFrame. Default to multiprocessing.cpu_count for now Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ab5f66a2 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ab5f66a2 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ab5f66a2 Branch: refs/heads/master Commit: ab5f66a2e9a2b6af312ffdfa2f95c65b1d6f5739 Parents: 1079a32 Author: Wes McKinney <[email protected]> Authored: Wed Dec 28 07:49:06 2016 -0500 Committer: Wes McKinney <[email protected]> Committed: Wed Dec 28 07:49:06 2016 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 71 +------------ cpp/cmake_modules/SetupCxxFlags.cmake | 86 ++++++++++++++++ python/CMakeLists.txt | 36 +------ python/pyarrow/__init__.py | 1 + python/pyarrow/config.pyx | 23 +++++ python/pyarrow/table.pyx | 38 +++---- python/pyarrow/tests/test_convert_pandas.py | 42 ++++++-- python/src/pyarrow/adapters/pandas.cc | 121 ++++++++++++++++------- 8 files changed, 250 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 93e9853..4507e67 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -105,76 +105,7 @@ endif() # Compiler flags ############################################################ -# Check if the target architecture and compiler supports some special -# instruction sets that would boost performance. -include(CheckCXXCompilerFlag) -# x86/amd64 compiler flags -CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3) -# power compiler flags -CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC) - -# compiler flags that are common across debug/release builds -# - Wall: Enable all warnings. -set(CXX_COMMON_FLAGS "-std=c++11 -Wall") - -# Only enable additional instruction sets if they are supported -if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3) - set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3") -endif() -if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC) - set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec") -endif() - -if (APPLE) - # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be - # the default standard library which does not support C++11. libc++ is the - # default from 10.9 onward. - set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++") -endif() - -# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .') -# For all builds: -# For CMAKE_BUILD_TYPE=Debug -# -ggdb: Enable gdb debugging -# For CMAKE_BUILD_TYPE=FastDebug -# Same as DEBUG, except with some optimizations on. -# For CMAKE_BUILD_TYPE=Release -# -O3: Enable all compiler optimizations -# -g: Enable symbols for profiler tools (TODO: remove for shipping) -if (NOT MSVC) - set(CXX_FLAGS_DEBUG "-ggdb -O0") - set(CXX_FLAGS_FASTDEBUG "-ggdb -O1") - set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG") -endif() - -set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate") -set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use") - -# if no build build type is specified, default to debug builds -if (NOT CMAKE_BUILD_TYPE) - set(CMAKE_BUILD_TYPE Debug) -endif(NOT CMAKE_BUILD_TYPE) - -string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE) - - -# Set compile flags based on the build type. -message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})") -if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}") -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}") -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}") -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}") -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}") -else() - message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") -endif () - -message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}") +include(SetupCxxFlags) # Add common flags set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/cmake_modules/SetupCxxFlags.cmake ---------------------------------------------------------------------- diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake new file mode 100644 index 0000000..ee672bd --- /dev/null +++ b/cpp/cmake_modules/SetupCxxFlags.cmake @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Check if the target architecture and compiler supports some special +# instruction sets that would boost performance. +include(CheckCXXCompilerFlag) +# x86/amd64 compiler flags +CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3) +# power compiler flags +CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC) + +# compiler flags that are common across debug/release builds +# - Wall: Enable all warnings. +set(CXX_COMMON_FLAGS "-std=c++11 -Wall") + +# Only enable additional instruction sets if they are supported +if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3") +endif() +if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec") +endif() + +if (APPLE) + # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be + # the default standard library which does not support C++11. libc++ is the + # default from 10.9 onward. + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++") +endif() + +# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .') +# For all builds: +# For CMAKE_BUILD_TYPE=Debug +# -ggdb: Enable gdb debugging +# For CMAKE_BUILD_TYPE=FastDebug +# Same as DEBUG, except with some optimizations on. +# For CMAKE_BUILD_TYPE=Release +# -O3: Enable all compiler optimizations +# -g: Enable symbols for profiler tools (TODO: remove for shipping) +if (NOT MSVC) + set(CXX_FLAGS_DEBUG "-ggdb -O0") + set(CXX_FLAGS_FASTDEBUG "-ggdb -O1") + set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG") +endif() + +set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate") +set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use") + +# if no build build type is specified, default to debug builds +if (NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Debug) +endif(NOT CMAKE_BUILD_TYPE) + +string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE) + +# Set compile flags based on the build type. +message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})") +if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}") +elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}") +elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}") +elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}") +elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}") +else() + message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") +endif () + +message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}") http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 6ad55f8..6c24772 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -65,41 +65,7 @@ endif(CCACHE_FOUND) # Compiler flags ############################################################ -# compiler flags that are common across debug/release builds -set(CXX_COMMON_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall") - -# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .') -# For all builds: -# For CMAKE_BUILD_TYPE=Debug -# -ggdb: Enable gdb debugging -# For CMAKE_BUILD_TYPE=FastDebug -# Same as DEBUG, except with some optimizations on. -# For CMAKE_BUILD_TYPE=Release -# -O3: Enable all compiler optimizations -# -g: Enable symbols for profiler tools (TODO: remove for shipping) -# -DNDEBUG: Turn off dchecks/asserts/debug only code. -set(CXX_FLAGS_DEBUG "-ggdb -O0") -set(CXX_FLAGS_FASTDEBUG "-ggdb -O1") -set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG") - -# if no build build type is specified, default to debug builds -if (NOT CMAKE_BUILD_TYPE) - set(CMAKE_BUILD_TYPE Debug) -endif(NOT CMAKE_BUILD_TYPE) - -string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE) - -# Set compile flags based on the build type. -message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})") -if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG}) -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG}) -elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE") - set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE}) -else() - message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") -endif () +include(SetupCxxFlags) # Add common flags set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 9ede934..6f81ef4 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -26,6 +26,7 @@ except DistributionNotFound: import pyarrow.config +from pyarrow.config import cpu_count, set_cpu_count from pyarrow.array import (Array, from_pandas_series, from_pylist, http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/config.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx index 778c15a..aa30f09 100644 --- a/python/pyarrow/config.pyx +++ b/python/pyarrow/config.pyx @@ -29,3 +29,26 @@ pyarrow_init() import numpy as np pyarrow_set_numpy_nan(np.nan) + +import multiprocessing +import os +cdef int CPU_COUNT = int( + os.environ.get('OMP_NUM_THREADS', + max(multiprocessing.cpu_count() // 2, 1))) + +def cpu_count(): + """ + Returns + ------- + count : Number of CPUs to use by default in parallel operations. Default is + max(1, multiprocessing.cpu_count() / 2), but can be overridden by the + OMP_NUM_THREADS environment variable. For the default, we divide the CPU + count by 2 because most modern computers have hyperthreading turned on, + so doubling the CPU count beyond the number of physical cores does not + help. + """ + return CPU_COUNT + +def set_cpu_count(count): + global CPU_COUNT + CPU_COUNT = max(int(count), 1) http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 9375557..20137e3 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -439,7 +439,9 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): from pandas.core.internals import BlockManager, make_block from pandas import RangeIndex - check_status(pyarrow.ConvertTableToPandas(table, nthreads, &result_obj)) + with nogil: + check_status(pyarrow.ConvertTableToPandas(table, nthreads, + &result_obj)) result = PyObject_to_object(result_obj) @@ -610,36 +612,28 @@ cdef class Table: table.init(c_table) return table - def to_pandas(self, nthreads=1, block_based=True): + def to_pandas(self, nthreads=None): """ Convert the arrow::Table to a pandas DataFrame + Parameters + ---------- + nthreads : int, default max(1, multiprocessing.cpu_count() / 2) + For the default, we divide the CPU count by 2 because most modern + computers have hyperthreading turned on, so doubling the CPU count + beyond the number of physical cores does not help + Returns ------- pandas.DataFrame """ - cdef: - PyObject* arr - shared_ptr[CColumn] col - Column column - import pandas as pd - if block_based: - mgr = table_to_blockmanager(self.sp_table, nthreads) - return pd.DataFrame(mgr) - else: - names = [] - data = [] - for i in range(self.table.num_columns()): - col = self.table.column(i) - column = self.column(i) - check_status(pyarrow.ConvertColumnToPandas( - col, <PyObject*> column, &arr)) - names.append(frombytes(col.get().name())) - data.append(PyObject_to_object(arr)) - - return pd.DataFrame(dict(zip(names, data)), columns=names) + if nthreads is None: + nthreads = pyarrow.config.cpu_count() + + mgr = table_to_blockmanager(self.sp_table, nthreads) + return pd.DataFrame(mgr) @property def name(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/tests/test_convert_pandas.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index da34f85..863aa30 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -27,6 +27,29 @@ from pyarrow.compat import u import pyarrow as A +def _alltypes_example(size=100): + return pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + # TODO(wesm): Pandas only support ns resolution, Arrow supports s, ms, + # us, ns + 'datetime': np.arange("2016-01-01T00:00:00.001", size, + dtype='datetime64[ms]'), + 'str': [str(x) for x in range(size)], + 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], + 'empty_str': [''] * size + }) + + class TestPandasConversion(unittest.TestCase): def setUp(self): @@ -35,10 +58,10 @@ class TestPandasConversion(unittest.TestCase): def tearDown(self): pass - def _check_pandas_roundtrip(self, df, expected=None, + def _check_pandas_roundtrip(self, df, expected=None, nthreads=1, timestamps_to_ms=False): table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms) - result = table.to_pandas() + result = table.to_pandas(nthreads=nthreads) if expected is None: expected = df tm.assert_frame_equal(result, expected) @@ -217,18 +240,21 @@ class TestPandasConversion(unittest.TestCase): def test_date(self): df = pd.DataFrame({ - 'date': [ - datetime.date(2000, 1, 1), - None, - datetime.date(1970, 1, 1), - datetime.date(2040, 2, 26) - ]}) + 'date': [datetime.date(2000, 1, 1), + None, + datetime.date(1970, 1, 1), + datetime.date(2040, 2, 26)]}) table = A.from_pandas_dataframe(df) result = table.to_pandas() expected = df.copy() expected['date'] = pd.to_datetime(df['date']) tm.assert_frame_equal(result, expected) + def test_threaded_conversion(self): + df = _alltypes_example() + self._check_pandas_roundtrip(df, nthreads=2, + timestamps_to_ms=False) + # def test_category(self): # repeats = 1000 # values = [b'foo', None, u'bar', 'qux', np.nan] http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 899eb55..5e5826b 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -19,15 +19,18 @@ #include <Python.h> -#include "pyarrow/numpy_interop.h" - #include "pyarrow/adapters/pandas.h" +#include "pyarrow/numpy_interop.h" +#include <algorithm> +#include <atomic> #include <cmath> #include <cstdint> #include <memory> +#include <mutex> #include <sstream> #include <string> +#include <thread> #include <unordered_map> #include "arrow/api.h" @@ -1031,7 +1034,8 @@ class PandasBlock { : num_rows_(num_rows), num_columns_(num_columns) {} virtual Status Allocate() = 0; - virtual Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) = 0; + virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) = 0; PyObject* block_arr() { return block_arr_.obj(); } @@ -1057,7 +1061,6 @@ class PandasBlock { block_arr_.reset(block_arr); placement_arr_.reset(placement_arr); - current_placement_index_ = 0; block_data_ = reinterpret_cast<uint8_t*>( PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr))); @@ -1070,7 +1073,6 @@ class PandasBlock { int64_t num_rows_; int num_columns_; - int current_placement_index_; OwnedRef block_arr_; uint8_t* block_data_; @@ -1088,11 +1090,12 @@ class ObjectBlock : public PandasBlock { Status Allocate() override { return AllocateNDArray(NPY_OBJECT); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; PyObject** out_buffer = - reinterpret_cast<PyObject**>(block_data_) + current_placement_index_ * num_rows_; + reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_; const ChunkedArray& data = *col->data().get(); @@ -1108,7 +1111,7 @@ class ObjectBlock : public PandasBlock { return Status::NotImplemented(ss.str()); } - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1122,18 +1125,19 @@ class IntBlock : public PandasBlock { return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; C_TYPE* out_buffer = - reinterpret_cast<C_TYPE*>(block_data_) + current_placement_index_ * num_rows_; + reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_; const ChunkedArray& data = *col->data().get(); if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); } ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer); - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1153,16 +1157,16 @@ class Float32Block : public PandasBlock { Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); } - float* out_buffer = - reinterpret_cast<float*>(block_data_) + current_placement_index_ * num_rows_; + float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_; ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer); - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1173,11 +1177,12 @@ class Float64Block : public PandasBlock { Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; double* out_buffer = - reinterpret_cast<double*>(block_data_) + current_placement_index_ * num_rows_; + reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_; const ChunkedArray& data = *col->data().get(); @@ -1214,7 +1219,7 @@ class Float64Block : public PandasBlock { #undef INTEGER_CASE - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1225,16 +1230,17 @@ class BoolBlock : public PandasBlock { Status Allocate() override { return AllocateNDArray(NPY_BOOL); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); } uint8_t* out_buffer = - reinterpret_cast<uint8_t*>(block_data_) + current_placement_index_ * num_rows_; + reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_; ConvertBooleanNoNulls(*col->data().get(), out_buffer); - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1253,11 +1259,12 @@ class DatetimeBlock : public PandasBlock { return Status::OK(); } - Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override { + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { Type::type type = col->type()->type; int64_t* out_buffer = - reinterpret_cast<int64_t*>(block_data_) + current_placement_index_ * num_rows_; + reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_; const ChunkedArray& data = *col.get()->data(); @@ -1283,7 +1290,7 @@ class DatetimeBlock : public PandasBlock { return Status::NotImplemented(col->type()->ToString()); } - placement_data_[current_placement_index_++] = placement; + placement_data_[rel_placement] = abs_placement; return Status::OK(); } }; @@ -1333,6 +1340,7 @@ class DataFrameBlockCreator { Status Convert(int nthreads, PyObject** output) { column_types_.resize(table_->num_columns()); + column_block_placement_.resize(table_->num_columns()); type_counts_.clear(); blocks_.clear(); @@ -1397,7 +1405,9 @@ class DataFrameBlockCreator { } auto it = type_counts_.find(output_type); + int block_placement = 0; if (it != type_counts_.end()) { + block_placement = it->second; // Increment count it->second += 1; } else { @@ -1406,6 +1416,7 @@ class DataFrameBlockCreator { } column_types_[i] = output_type; + column_block_placement_[i] = block_placement; } return Status::OK(); } @@ -1421,22 +1432,61 @@ class DataFrameBlockCreator { } Status WriteTableToBlocks(int nthreads) { - if (nthreads > 1) { - return Status::NotImplemented("multithreading not yet implemented"); - } + auto WriteColumn = [this](int i) { + std::shared_ptr<Column> col = this->table_->column(i); + PandasBlock::type output_type = this->column_types_[i]; - for (int i = 0; i < table_->num_columns(); ++i) { - std::shared_ptr<Column> col = table_->column(i); - PandasBlock::type output_type = column_types_[i]; + int rel_placement = this->column_block_placement_[i]; + + auto it = this->blocks_.find(output_type); + if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); } + return it->second->Write(col, i, rel_placement); + }; - auto it = blocks_.find(output_type); - if (it == blocks_.end()) { return Status::KeyError("No block allocated"); } - RETURN_NOT_OK(it->second->WriteNext(col, i)); + nthreads = std::min<int>(nthreads, table_->num_columns()); + + if (nthreads == 1) { + for (int i = 0; i < table_->num_columns(); ++i) { + RETURN_NOT_OK(WriteColumn(i)); + } + } else { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() { + int column_num; + while (!error_occurred) { + column_num = task_counter.fetch_add(1); + if (column_num >= this->table_->num_columns()) { break; } + Status s = WriteColumn(column_num); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + + if (error_occurred) { return error; } } return Status::OK(); } Status GetResultList(PyObject** out) { + PyAcquireGIL lock; + auto num_blocks = static_cast<Py_ssize_t>(blocks_.size()); PyObject* result = PyList_New(num_blocks); RETURN_IF_PYERROR(); @@ -1463,8 +1513,13 @@ class DataFrameBlockCreator { private: std::shared_ptr<Table> table_; + + // column num -> block type id std::vector<PandasBlock::type> column_types_; + // column num -> relative placement within internal block + std::vector<int> column_block_placement_; + // block type -> type count std::unordered_map<int, int> type_counts_;
