Repository: arrow Updated Branches: refs/heads/master 95ee96b92 -> 9e875a684
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index f96ead3..c37a1ce 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -17,10 +17,10 @@ # Arrow file and stream reader/writer classes, and other messaging tools -import pyarrow._io as _io +import pyarrow.lib as lib -class StreamReader(_io._StreamReader): +class StreamReader(lib._StreamReader): """ Reader for the Arrow streaming binary format @@ -37,7 +37,7 @@ class StreamReader(_io._StreamReader): yield self.get_next_batch() -class StreamWriter(_io._StreamWriter): +class StreamWriter(lib._StreamWriter): """ Writer for the Arrow streaming binary format @@ -52,7 +52,7 @@ class StreamWriter(_io._StreamWriter): self._open(sink, schema) -class FileReader(_io._FileReader): +class FileReader(lib._FileReader): """ Class for reading Arrow record batch data from the Arrow binary file format @@ -68,7 +68,7 @@ class FileReader(_io._FileReader): self._open(source, footer_offset=footer_offset) -class FileWriter(_io._FileWriter): +class FileWriter(lib._FileWriter): """ Writer to create the Arrow binary file format http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/lib.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd new file mode 100644 index 0000000..d3d03aa --- /dev/null +++ b/python/pyarrow/lib.pxd @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from cpython cimport PyObject + +cdef extern from "Python.h": + int PySlice_Check(object) + + +from pyarrow.includes.libarrow cimport CStatus + + +cdef int check_status(const CStatus& status) nogil except -1 + + +cdef class MemoryPool: + cdef: + CMemoryPool* pool + + cdef init(self, CMemoryPool* pool) + + +cdef class LoggingMemoryPool(MemoryPool): + pass + + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool) + + +cdef class DataType: + cdef: + shared_ptr[CDataType] sp_type + CDataType* type + + cdef void init(self, const shared_ptr[CDataType]& type) + + +cdef class DictionaryType(DataType): + cdef: + const CDictionaryType* dict_type + + +cdef class TimestampType(DataType): + cdef: + const CTimestampType* ts_type + + +cdef class Time32Type(DataType): + cdef: + const CTime32Type* time_type + + +cdef class Time64Type(DataType): + cdef: + const CTime64Type* time_type + + +cdef class FixedSizeBinaryType(DataType): + cdef: + const CFixedSizeBinaryType* fixed_size_binary_type + + +cdef class DecimalType(FixedSizeBinaryType): + cdef: + const CDecimalType* decimal_type + + +cdef class Field: + cdef: + shared_ptr[CField] sp_field + CField* field + + cdef readonly: + DataType type + + cdef init(self, const shared_ptr[CField]& field) + + +cdef class Schema: + cdef: + shared_ptr[CSchema] sp_schema + CSchema* schema + + cdef init(self, const vector[shared_ptr[CField]]& fields) + cdef init_schema(self, const shared_ptr[CSchema]& schema) + + +cdef class Scalar: + cdef readonly: + DataType type + + +cdef class NAType(Scalar): + pass + + +cdef class ArrayValue(Scalar): + cdef: + shared_ptr[CArray] sp_array + int64_t index + + cdef void init(self, DataType type, + const shared_ptr[CArray]& sp_array, int64_t index) + + cdef void _set_array(self, const shared_ptr[CArray]& sp_array) + + +cdef class Int8Value(ArrayValue): + pass + + +cdef class Int64Value(ArrayValue): + pass + + +cdef class ListValue(ArrayValue): + cdef readonly: + DataType value_type + + cdef: + CListArray* ap + + cdef getitem(self, int64_t i) + + +cdef class StringValue(ArrayValue): + pass + + +cdef class FixedSizeBinaryValue(ArrayValue): + pass + + +cdef class Array: + cdef: + shared_ptr[CArray] sp_array + CArray* ap + + cdef readonly: + DataType type + + cdef init(self, const shared_ptr[CArray]& sp_array) + cdef getitem(self, int64_t i) + + +cdef class Tensor: + cdef: + shared_ptr[CTensor] sp_tensor + CTensor* tp + + cdef readonly: + DataType type + + cdef init(self, const shared_ptr[CTensor]& sp_tensor) + + +cdef class NullArray(Array): + pass + + +cdef class BooleanArray(Array): + pass + + +cdef class NumericArray(Array): + pass + + +cdef class IntegerArray(NumericArray): + pass + + +cdef class FloatingPointArray(NumericArray): + pass + + +cdef class Int8Array(IntegerArray): + pass + + +cdef class UInt8Array(IntegerArray): + pass + + +cdef class Int16Array(IntegerArray): + pass + + +cdef class UInt16Array(IntegerArray): + pass + + +cdef class Int32Array(IntegerArray): + pass + + +cdef class UInt32Array(IntegerArray): + pass + + +cdef class Int64Array(IntegerArray): + pass + + +cdef class UInt64Array(IntegerArray): + pass + + +cdef class FloatArray(FloatingPointArray): + pass + + +cdef class DoubleArray(FloatingPointArray): + pass + + +cdef class FixedSizeBinaryArray(Array): + pass + + +cdef class DecimalArray(FixedSizeBinaryArray): + pass + + +cdef class ListArray(Array): + pass + + +cdef class StringArray(Array): + pass + + +cdef class BinaryArray(Array): + pass + + +cdef class DictionaryArray(Array): + cdef: + object _indices, _dictionary + + +cdef wrap_array_output(PyObject* output) +cdef object box_scalar(DataType type, + const shared_ptr[CArray]& sp_array, + int64_t index) + + +cdef class ChunkedArray: + cdef: + shared_ptr[CChunkedArray] sp_chunked_array + CChunkedArray* chunked_array + + cdef init(self, const shared_ptr[CChunkedArray]& chunked_array) + cdef _check_nullptr(self) + + +cdef class Column: + cdef: + shared_ptr[CColumn] sp_column + CColumn* column + + cdef init(self, const shared_ptr[CColumn]& column) + cdef _check_nullptr(self) + + +cdef class Table: + cdef: + shared_ptr[CTable] sp_table + CTable* table + + cdef init(self, const shared_ptr[CTable]& table) + cdef _check_nullptr(self) + + +cdef class RecordBatch: + cdef: + shared_ptr[CRecordBatch] sp_batch + CRecordBatch* batch + Schema _schema + + cdef init(self, const shared_ptr[CRecordBatch]& table) + cdef _check_nullptr(self) + + +cdef class Buffer: + cdef: + shared_ptr[CBuffer] buffer + Py_ssize_t shape[1] + Py_ssize_t strides[1] + + cdef init(self, const shared_ptr[CBuffer]& buffer) + + +cdef class NativeFile: + cdef: + shared_ptr[RandomAccessFile] rd_file + shared_ptr[OutputStream] wr_file + bint is_readable + bint is_writeable + bint is_open + bint own_file + + # By implementing these "virtual" functions (all functions in Cython + # extension classes are technically virtual in the C++ sense) we can expose + # the arrow::io abstract file interfaces to other components throughout the + # suite of Arrow C++ libraries + cdef read_handle(self, shared_ptr[RandomAccessFile]* file) + cdef write_handle(self, shared_ptr[OutputStream]* file) + +cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader) +cdef get_writer(object source, shared_ptr[OutputStream]* writer) + +cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf) +cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type) +cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field) +cdef public object pyarrow_wrap_schema(const shared_ptr[CSchema]& type) +cdef public object pyarrow_wrap_array(const shared_ptr[CArray]& sp_array) +cdef public object pyarrow_wrap_tensor(const shared_ptr[CTensor]& sp_tensor) +cdef public object pyarrow_wrap_column(const shared_ptr[CColumn]& ccolumn) +cdef public object pyarrow_wrap_table(const shared_ptr[CTable]& ctable) +cdef public object pyarrow_wrap_batch(const shared_ptr[CRecordBatch]& cbatch) http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/lib.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx new file mode 100644 index 0000000..ae311ac --- /dev/null +++ b/python/pyarrow/lib.pyx @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from cython.operator cimport dereference as deref +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.common cimport PyObject_to_object +cimport pyarrow.includes.libarrow as libarrow +cimport cpython as cp + + +import datetime +import decimal as _pydecimal +import numpy as np +import six +from pyarrow.compat import frombytes, tobytes, PandasSeries, Categorical + +cdef _pandas(): + import pandas as pd + return pd + + +arrow_init_numpy() + +import numpy as np +set_numpy_nan(np.nan) + +import multiprocessing +import os +cdef int CPU_COUNT = int( + os.environ.get('OMP_NUM_THREADS', + max(multiprocessing.cpu_count() // 2, 1))) + + +def cpu_count(): + """ + Returns + ------- + count : Number of CPUs to use by default in parallel operations. Default is + max(1, multiprocessing.cpu_count() / 2), but can be overridden by the + OMP_NUM_THREADS environment variable. For the default, we divide the CPU + count by 2 because most modern computers have hyperthreading turned on, + so doubling the CPU count beyond the number of physical cores does not + help. + """ + return CPU_COUNT + +def set_cpu_count(count): + global CPU_COUNT + CPU_COUNT = max(int(count), 1) + + +# Exception types +include "error.pxi" + +# Memory pools and allocation +include "memory.pxi" + +# Array types +include "array.pxi" + +# Column, Table, Record Batch +include "table.pxi" + +# File IO, IPC +include "io.pxi" + +#---------------------------------------------------------------------- +# Public API + +include "public-api.pxi" http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/memory.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi new file mode 100644 index 0000000..15d59d2 --- /dev/null +++ b/python/pyarrow/memory.pxi @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + + +cdef class MemoryPool: + cdef init(self, CMemoryPool* pool): + self.pool = pool + + def bytes_allocated(self): + return self.pool.bytes_allocated() + + +cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool): + if memory_pool is None: + return c_get_memory_pool() + else: + return memory_pool.pool + + +cdef class LoggingMemoryPool(MemoryPool): + pass + + +def default_memory_pool(): + cdef: + MemoryPool pool = MemoryPool() + pool.init(c_get_memory_pool()) + return pool + + +def set_memory_pool(MemoryPool pool): + c_set_default_memory_pool(pool.pool) + + +def total_allocated_bytes(): + cdef CMemoryPool* pool = c_get_memory_pool() + return pool.bytes_allocated() http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 21359f1..050ec31 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -24,8 +24,7 @@ from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa RowGroupMetaData, ParquetSchema, ParquetWriter) import pyarrow._parquet as _parquet # noqa -import pyarrow._array as _array -import pyarrow._table as _table +import pyarrow.lib as lib # ---------------------------------------------------------------------- @@ -241,8 +240,8 @@ class ParquetDatasetPiece(object): # manifest, so ['a', 'b', 'c'] as in our example above. dictionary = partitions.levels[i].dictionary - arr = _array.DictionaryArray.from_arrays(indices, dictionary) - col = _table.Column.from_array(name, arr) + arr = lib.DictionaryArray.from_arrays(indices, dictionary) + col = lib.Column.from_array(name, arr) table = table.append_column(col) return table @@ -298,9 +297,9 @@ class PartitionSet(object): # Only integer and string partition types are supported right now try: integer_keys = [int(x) for x in self.keys] - dictionary = _array.array(integer_keys) + dictionary = lib.array(integer_keys) except ValueError: - dictionary = _array.array(self.keys) + dictionary = lib.array(self.keys) self._dictionary = dictionary return dictionary @@ -539,7 +538,7 @@ class ParquetDataset(object): open_file_func=open_file) tables.append(table) - all_data = _table.concat_tables(tables) + all_data = lib.concat_tables(tables) return all_data def _get_open_file_func(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/public-api.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi new file mode 100644 index 0000000..7b55651 --- /dev/null +++ b/python/pyarrow/public-api.pxi @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from libcpp.memory cimport shared_ptr +from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CField, + CRecordBatch, CSchema, + CTable, CTensor) + + +cdef public api object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf): + cdef Buffer result = Buffer() + result.init(buf) + return result + + +cdef public api object pyarrow_wrap_data_type( + const shared_ptr[CDataType]& type): + cdef: + DataType out + + if type.get() == NULL: + return None + + if type.get().id() == _Type_DICTIONARY: + out = DictionaryType() + elif type.get().id() == _Type_TIMESTAMP: + out = TimestampType() + elif type.get().id() == _Type_FIXED_SIZE_BINARY: + out = FixedSizeBinaryType() + elif type.get().id() == _Type_DECIMAL: + out = DecimalType() + else: + out = DataType() + + out.init(type) + return out + + +cdef public api object pyarrow_wrap_field(const shared_ptr[CField]& field): + if field.get() == NULL: + return None + cdef Field out = Field() + out.init(field) + return out + + +cdef public api object pyarrow_wrap_schema(const shared_ptr[CSchema]& type): + cdef Schema out = Schema() + out.init_schema(type) + return out + + +cdef public api object pyarrow_wrap_array(const shared_ptr[CArray]& sp_array): + if sp_array.get() == NULL: + raise ValueError('Array was NULL') + + cdef CDataType* data_type = sp_array.get().type().get() + + if data_type == NULL: + raise ValueError('Array data type was NULL') + + cdef Array arr = _array_classes[data_type.id()]() + arr.init(sp_array) + return arr + + +cdef public api object pyarrow_wrap_tensor( + const shared_ptr[CTensor]& sp_tensor): + if sp_tensor.get() == NULL: + raise ValueError('Tensor was NULL') + + cdef Tensor tensor = Tensor() + tensor.init(sp_tensor) + return tensor + + +cdef public api object pyarrow_wrap_column(const shared_ptr[CColumn]& ccolumn): + cdef Column column = Column() + column.init(ccolumn) + return column + + +cdef public api object pyarrow_wrap_table(const shared_ptr[CTable]& ctable): + cdef Table table = Table() + table.init(ctable) + return table + + +cdef public api object pyarrow_wrap_batch( + const shared_ptr[CRecordBatch]& cbatch): + cdef RecordBatch batch = RecordBatch() + batch.init(cbatch) + return batch http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/table.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi new file mode 100644 index 0000000..8dd18cf --- /dev/null +++ b/python/pyarrow/table.pxi @@ -0,0 +1,884 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections import OrderedDict + + +cdef class ChunkedArray: + """ + Array backed via one or more memory chunks. + + Warning + ------- + Do not call this class's constructor directly. + """ + + def __cinit__(self): + self.chunked_array = NULL + + cdef init(self, const shared_ptr[CChunkedArray]& chunked_array): + self.sp_chunked_array = chunked_array + self.chunked_array = chunked_array.get() + + cdef _check_nullptr(self): + if self.chunked_array == NULL: + raise ReferenceError("ChunkedArray object references a NULL " + "pointer. Not initialized.") + + def length(self): + self._check_nullptr() + return self.chunked_array.length() + + def __len__(self): + return self.length() + + @property + def null_count(self): + """ + Number of null entires + + Returns + ------- + int + """ + self._check_nullptr() + return self.chunked_array.null_count() + + @property + def num_chunks(self): + """ + Number of underlying chunks + + Returns + ------- + int + """ + self._check_nullptr() + return self.chunked_array.num_chunks() + + def chunk(self, i): + """ + Select a chunk by its index + + Parameters + ---------- + i : int + + Returns + ------- + pyarrow.Array + """ + self._check_nullptr() + return pyarrow_wrap_array(self.chunked_array.chunk(i)) + + def iterchunks(self): + for i in range(self.num_chunks): + yield self.chunk(i) + + def to_pylist(self): + """ + Convert to a list of native Python objects. + """ + result = [] + for i in range(self.num_chunks): + result += self.chunk(i).to_pylist() + return result + + +cdef class Column: + """ + Named vector of elements of equal type. + + Warning + ------- + Do not call this class's constructor directly. + """ + + def __cinit__(self): + self.column = NULL + + cdef init(self, const shared_ptr[CColumn]& column): + self.sp_column = column + self.column = column.get() + + @staticmethod + def from_array(object field_or_name, Array arr): + cdef Field boxed_field + + if isinstance(field_or_name, Field): + boxed_field = field_or_name + else: + boxed_field = field(field_or_name, arr.type) + + cdef shared_ptr[CColumn] sp_column + sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array)) + return pyarrow_wrap_column(sp_column) + + def to_pandas(self): + """ + Convert the arrow::Column to a pandas.Series + + Returns + ------- + pandas.Series + """ + cdef: + PyObject* out + + check_status(libarrow.ConvertColumnToPandas(self.sp_column, + self, &out)) + + return _pandas().Series(wrap_array_output(out), name=self.name) + + def equals(self, Column other): + """ + Check if contents of two columns are equal + + Parameters + ---------- + other : pyarrow.Column + + Returns + ------- + are_equal : boolean + """ + cdef: + CColumn* my_col = self.column + CColumn* other_col = other.column + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_col.Equals(deref(other_col)) + + return result + + def to_pylist(self): + """ + Convert to a list of native Python objects. + """ + return self.data.to_pylist() + + cdef _check_nullptr(self): + if self.column == NULL: + raise ReferenceError("Column object references a NULL pointer." + "Not initialized.") + + def __len__(self): + self._check_nullptr() + return self.column.length() + + def length(self): + self._check_nullptr() + return self.column.length() + + @property + def shape(self): + """ + Dimensions of this columns + + Returns + ------- + (int,) + """ + self._check_nullptr() + return (self.length(),) + + @property + def null_count(self): + """ + Number of null entires + + Returns + ------- + int + """ + self._check_nullptr() + return self.column.null_count() + + @property + def name(self): + """ + Label of the column + + Returns + ------- + str + """ + return bytes(self.column.name()).decode('utf8') + + @property + def type(self): + """ + Type information for this column + + Returns + ------- + pyarrow.DataType + """ + return pyarrow_wrap_data_type(self.column.type()) + + @property + def data(self): + """ + The underlying data + + Returns + ------- + pyarrow.ChunkedArray + """ + cdef ChunkedArray chunked_array = ChunkedArray() + chunked_array.init(self.column.data()) + return chunked_array + + +cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict( + dict metadata): + cdef: + unordered_map[c_string, c_string] unordered_metadata = metadata + return (<shared_ptr[const CKeyValueMetadata]> + make_shared[CKeyValueMetadata](unordered_metadata)) + + +cdef int _schema_from_arrays( + arrays, names, dict metadata, shared_ptr[CSchema]* schema) except -1: + cdef: + Array arr + Column col + c_string c_name + vector[shared_ptr[CField]] fields + shared_ptr[CDataType] type_ + int K = len(arrays) + + fields.resize(K) + + if len(arrays) == 0: + raise ValueError('Must pass at least one array') + + if isinstance(arrays[0], Array): + if names is None: + raise ValueError('Must pass names when constructing ' + 'from Array objects') + for i in range(K): + arr = arrays[i] + type_ = arr.type.sp_type + c_name = tobytes(names[i]) + fields[i].reset(new CField(c_name, type_, True)) + elif isinstance(arrays[0], Column): + for i in range(K): + col = arrays[i] + type_ = col.sp_column.get().type() + c_name = tobytes(col.name) + fields[i].reset(new CField(c_name, type_, True)) + else: + raise TypeError(type(arrays[0])) + + schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata))) + return 0 + + +cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema): + cdef: + list names = [] + list arrays = [] + DataType type = None + dict metadata = {} + + for name in df.columns: + col = df[name] + if schema is not None: + type = schema.field_by_name(name).type + + arr = Array.from_pandas(col, type=type, + timestamps_to_ms=timestamps_to_ms) + names.append(name) + arrays.append(arr) + + return names, arrays, metadata + + +cdef class RecordBatch: + """ + Batch of rows of columns of equal length + + Warning + ------- + Do not call this class's constructor directly, use one of the ``from_*`` + methods instead. + """ + + def __cinit__(self): + self.batch = NULL + self._schema = None + + cdef init(self, const shared_ptr[CRecordBatch]& batch): + self.sp_batch = batch + self.batch = batch.get() + + cdef _check_nullptr(self): + if self.batch == NULL: + raise ReferenceError("Object not initialized") + + def __len__(self): + self._check_nullptr() + return self.batch.num_rows() + + @property + def num_columns(self): + """ + Number of columns + + Returns + ------- + int + """ + self._check_nullptr() + return self.batch.num_columns() + + @property + def num_rows(self): + """ + Number of rows + + Due to the definition of a RecordBatch, all columns have the same + number of rows. + + Returns + ------- + int + """ + return len(self) + + @property + def schema(self): + """ + Schema of the RecordBatch and its columns + + Returns + ------- + pyarrow.Schema + """ + cdef Schema schema + self._check_nullptr() + if self._schema is None: + schema = Schema() + schema.init_schema(self.batch.schema()) + self._schema = schema + + return self._schema + + def __getitem__(self, i): + return pyarrow_wrap_array(self.batch.column(i)) + + def slice(self, offset=0, length=None): + """ + Compute zero-copy slice of this RecordBatch + + Parameters + ---------- + offset : int, default 0 + Offset from start of array to slice + length : int, default None + Length of slice (default is until end of batch starting from + offset) + + Returns + ------- + sliced : RecordBatch + """ + cdef shared_ptr[CRecordBatch] result + + if offset < 0: + raise IndexError('Offset must be non-negative') + + if length is None: + result = self.batch.Slice(offset) + else: + result = self.batch.Slice(offset, length) + + return pyarrow_wrap_batch(result) + + def equals(self, RecordBatch other): + cdef: + CRecordBatch* my_batch = self.batch + CRecordBatch* other_batch = other.batch + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_batch.Equals(deref(other_batch)) + + return result + + def to_pydict(self): + """ + Converted the arrow::RecordBatch to an OrderedDict + + Returns + ------- + OrderedDict + """ + entries = [] + for i in range(self.batch.num_columns()): + name = bytes(self.batch.column_name(i)).decode('utf8') + column = self[i].to_pylist() + entries.append((name, column)) + return OrderedDict(entries) + + + def to_pandas(self, nthreads=None): + """ + Convert the arrow::RecordBatch to a pandas DataFrame + + Returns + ------- + pandas.DataFrame + """ + return Table.from_batches([self]).to_pandas(nthreads=nthreads) + + @classmethod + def from_pandas(cls, df, schema=None): + """ + Convert pandas.DataFrame to an Arrow RecordBatch + + Parameters + ---------- + df: pandas.DataFrame + schema: pyarrow.Schema (optional) + The expected schema of the RecordBatch. This can be used to + indicate the type of columns if we cannot infer it automatically. + + Returns + ------- + pyarrow.RecordBatch + """ + names, arrays, metadata = _dataframe_to_arrays(df, False, schema) + return cls.from_arrays(arrays, names, metadata) + + @staticmethod + def from_arrays(list arrays, list names, dict metadata=None): + """ + Construct a RecordBatch from multiple pyarrow.Arrays + + Parameters + ---------- + arrays: list of pyarrow.Array + column-wise data vectors + names: list of str + Labels for the columns + + Returns + ------- + pyarrow.RecordBatch + """ + cdef: + Array arr + c_string c_name + shared_ptr[CSchema] schema + shared_ptr[CRecordBatch] batch + vector[shared_ptr[CArray]] c_arrays + int64_t num_rows + int64_t i + int64_t number_of_arrays = len(arrays) + + if not number_of_arrays: + raise ValueError('Record batch cannot contain no arrays (for now)') + + num_rows = len(arrays[0]) + _schema_from_arrays(arrays, names, metadata or {}, &schema) + + c_arrays.reserve(len(arrays)) + for arr in arrays: + c_arrays.push_back(arr.sp_array) + + batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) + return pyarrow_wrap_batch(batch) + + +cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): + cdef: + PyObject* result_obj + CColumn* col + int i + + import pandas.core.internals as _int + from pandas import RangeIndex, Categorical + from pyarrow.compat import DatetimeTZDtype + + with nogil: + check_status(libarrow.ConvertTableToPandas(table, nthreads, + &result_obj)) + + result = PyObject_to_object(result_obj) + + blocks = [] + for item in result: + block_arr = item['block'] + placement = item['placement'] + if 'dictionary' in item: + cat = Categorical(block_arr, + categories=item['dictionary'], + ordered=False, fastpath=True) + block = _int.make_block(cat, placement=placement, + klass=_int.CategoricalBlock, + fastpath=True) + elif 'timezone' in item: + dtype = DatetimeTZDtype('ns', tz=item['timezone']) + block = _int.make_block(block_arr, placement=placement, + klass=_int.DatetimeTZBlock, + dtype=dtype, fastpath=True) + else: + block = _int.make_block(block_arr, placement=placement) + blocks.append(block) + + names = [] + for i in range(table.get().num_columns()): + col = table.get().column(i).get() + names.append(frombytes(col.name())) + + axes = [names, RangeIndex(table.get().num_rows())] + return _int.BlockManager(blocks, axes) + + +cdef class Table: + """ + A collection of top-level named, equal length Arrow arrays. + + Warning + ------- + Do not call this class's constructor directly, use one of the ``from_*`` + methods instead. + """ + + def __cinit__(self): + self.table = NULL + + def __repr__(self): + return 'pyarrow.Table\n{0}'.format(str(self.schema)) + + cdef init(self, const shared_ptr[CTable]& table): + self.sp_table = table + self.table = table.get() + + cdef _check_nullptr(self): + if self.table == NULL: + raise ReferenceError("Table object references a NULL pointer." + "Not initialized.") + + def equals(self, Table other): + """ + Check if contents of two tables are equal + + Parameters + ---------- + other : pyarrow.Table + + Returns + ------- + are_equal : boolean + """ + cdef: + CTable* my_table = self.table + CTable* other_table = other.table + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_table.Equals(deref(other_table)) + + return result + + @classmethod + def from_pandas(cls, df, timestamps_to_ms=False, schema=None): + """ + Convert pandas.DataFrame to an Arrow Table + + Parameters + ---------- + df: pandas.DataFrame + + timestamps_to_ms: bool + Convert datetime columns to ms resolution. This is needed for + compability with other functionality like Parquet I/O which + only supports milliseconds. + + schema: pyarrow.Schema (optional) + The expected schema of the Arrow Table. This can be used to + indicate the type of columns if we cannot infer it automatically. + + Returns + ------- + pyarrow.Table + + Examples + -------- + + >>> import pandas as pd + >>> import pyarrow as pa + >>> df = pd.DataFrame({ + ... 'int': [1, 2], + ... 'str': ['a', 'b'] + ... }) + >>> pa.Table.from_pandas(df) + <pyarrow.lib.Table object at 0x7f05d1fb1b40> + """ + names, arrays, metadata = _dataframe_to_arrays(df, + timestamps_to_ms=timestamps_to_ms, + schema=schema) + return cls.from_arrays(arrays, names=names, metadata=metadata) + + @staticmethod + def from_arrays(arrays, names=None, dict metadata=None): + """ + Construct a Table from Arrow arrays or columns + + Parameters + ---------- + arrays: list of pyarrow.Array or pyarrow.Column + Equal-length arrays that should form the table. + names: list of str, optional + Names for the table columns. If Columns passed, will be + inferred. If Arrays passed, this argument is required + + Returns + ------- + pyarrow.Table + + """ + cdef: + vector[shared_ptr[CColumn]] columns + shared_ptr[CSchema] schema + shared_ptr[CTable] table + size_t K = len(arrays) + + _schema_from_arrays(arrays, names, metadata or {}, &schema) + + columns.reserve(K) + + for i in range(K): + if isinstance(arrays[i], Array): + columns.push_back( + make_shared[CColumn]( + schema.get().field(i), + (<Array> arrays[i]).sp_array + ) + ) + elif isinstance(arrays[i], Column): + columns.push_back((<Column> arrays[i]).sp_column) + else: + raise ValueError(type(arrays[i])) + + table.reset(new CTable(schema, columns)) + return pyarrow_wrap_table(table) + + @staticmethod + def from_batches(batches): + """ + Construct a Table from a list of Arrow RecordBatches + + Parameters + ---------- + + batches: list of RecordBatch + RecordBatch list to be converted, schemas must be equal + """ + cdef: + vector[shared_ptr[CRecordBatch]] c_batches + shared_ptr[CTable] c_table + RecordBatch batch + + for batch in batches: + c_batches.push_back(batch.sp_batch) + + with nogil: + check_status(CTable.FromRecordBatches(c_batches, &c_table)) + + return pyarrow_wrap_table(c_table) + + def to_pandas(self, nthreads=None): + """ + Convert the arrow::Table to a pandas DataFrame + + Parameters + ---------- + nthreads : int, default max(1, multiprocessing.cpu_count() / 2) + For the default, we divide the CPU count by 2 because most modern + computers have hyperthreading turned on, so doubling the CPU count + beyond the number of physical cores does not help + + Returns + ------- + pandas.DataFrame + """ + if nthreads is None: + nthreads = cpu_count() + + mgr = table_to_blockmanager(self.sp_table, nthreads) + return _pandas().DataFrame(mgr) + + def to_pydict(self): + """ + Converted the arrow::Table to an OrderedDict + + Returns + ------- + OrderedDict + """ + entries = [] + for i in range(self.table.num_columns()): + name = self.column(i).name + column = self.column(i).to_pylist() + entries.append((name, column)) + return OrderedDict(entries) + + @property + def schema(self): + """ + Schema of the table and its columns + + Returns + ------- + pyarrow.Schema + """ + return pyarrow_wrap_schema(self.table.schema()) + + def column(self, index): + """ + Select a column by its numeric index. + + Parameters + ---------- + index: int + + Returns + ------- + pyarrow.Column + """ + self._check_nullptr() + cdef Column column = Column() + column.init(self.table.column(index)) + return column + + def __getitem__(self, i): + return self.column(i) + + def itercolumns(self): + """ + Iterator over all columns in their numerical order + """ + for i in range(self.num_columns): + yield self.column(i) + + @property + def num_columns(self): + """ + Number of columns in this table + + Returns + ------- + int + """ + self._check_nullptr() + return self.table.num_columns() + + @property + def num_rows(self): + """ + Number of rows in this table. + + Due to the definition of a table, all columns have the same number of rows. + + Returns + ------- + int + """ + self._check_nullptr() + return self.table.num_rows() + + def __len__(self): + return self.num_rows + + @property + def shape(self): + """ + Dimensions of the table: (#rows, #columns) + + Returns + ------- + (int, int) + """ + return (self.num_rows, self.num_columns) + + def add_column(self, int i, Column column): + """ + Add column to Table at position. Returns new table + """ + cdef: + shared_ptr[CTable] c_table + + with nogil: + check_status(self.table.AddColumn(i, column.sp_column, &c_table)) + + return pyarrow_wrap_table(c_table) + + def append_column(self, Column column): + """ + Append column at end of columns. Returns new table + """ + return self.add_column(self.num_columns, column) + + def remove_column(self, int i): + """ + Create new Table with the indicated column removed + """ + cdef shared_ptr[CTable] c_table + + with nogil: + check_status(self.table.RemoveColumn(i, &c_table)) + + return pyarrow_wrap_table(c_table) + + +def concat_tables(tables): + """ + Perform zero-copy concatenation of pyarrow.Table objects. Raises exception + if all of the Table schemas are not the same + + Parameters + ---------- + tables : iterable of pyarrow.Table objects + output_name : string, default None + A name for the output table, if any + """ + cdef: + vector[shared_ptr[CTable]] c_tables + shared_ptr[CTable] c_result + Table table + + for table in tables: + c_tables.push_back(table.sp_table) + + with nogil: + check_status(ConcatenateTables(c_tables, &c_result)) + + return pyarrow_wrap_table(c_result) http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/tests/test_feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index 7a8abf4..69c32be 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -27,7 +27,7 @@ import pyarrow as pa from pyarrow.compat import guid from pyarrow.feather import (read_feather, write_feather, FeatherReader) -from pyarrow._io import FeatherWriter +from pyarrow.lib import FeatherWriter def random_path(): http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 148224a..b38fca4 100644 --- a/python/setup.py +++ b/python/setup.py @@ -106,14 +106,9 @@ class build_ext(_build_ext): os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0')) CYTHON_MODULE_NAMES = [ - '_array', - '_config', - '_error', - '_io', + 'lib', '_jemalloc', - '_memory', - '_parquet', - '_table'] + '_parquet'] def _run_cmake(self): # The directory containing this setup.py
