This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 148213e  ARROW-3162: Flight Python bindings
148213e is described below

commit 148213e3befb02ae345cf5f62cb40d7f8720178a
Author: David Li <[email protected]>
AuthorDate: Thu Feb 14 18:16:56 2019 +0100

    ARROW-3162: Flight Python bindings
    
    - [ ] Add docs
    - [ ] Format code
    - [ ] Include Python in integration tests (requires binding the JSON 
reader/writer from C++)
    - [ ] Validate performance?
    - [ ] Complete server bindings if approach makes sense
    
    Author: David Li <[email protected]>
    Author: Wes McKinney <[email protected]>
    
    Closes #3566 from lihalite/flight-python and squashes the following commits:
    
    ac29ab88 <David Li> Clean up to-be-implemented parts of Flight Python 
bindings
    9d5442a0 <David Li> Clarify various RecordBatchStream{Reader,Writer} 
wrappers
    e1c298ad <David Li> Lint CMake files
    77644447 <Wes McKinney> Reformat cmake
    c6b02aa9 <David Li> Add basic Python bindings for Flight
---
 cpp/cmake_modules/FindArrow.cmake           |  14 +
 cpp/src/arrow/flight/CMakeLists.txt         |   2 +-
 cpp/src/arrow/python/CMakeLists.txt         |  13 +
 cpp/src/arrow/python/flight.cc              |  86 +++++
 cpp/src/arrow/python/flight.h               |  79 +++++
 python/CMakeLists.txt                       |  25 ++
 python/examples/flight/client.py            | 139 ++++++++
 python/examples/flight/server.py            |  56 ++++
 python/pyarrow/_flight.pyx                  | 476 ++++++++++++++++++++++++++++
 python/pyarrow/flight.py                    |  20 ++
 python/pyarrow/includes/common.pxd          |   8 +-
 python/pyarrow/includes/libarrow_flight.pxd | 142 +++++++++
 python/pyarrow/ipc.pxi                      |  94 ++++--
 python/pyarrow/ipc.py                       |   4 +-
 python/pyarrow/lib.pxd                      |  10 +
 python/requirements.txt                     |   1 +
 python/setup.py                             |  13 +-
 17 files changed, 1141 insertions(+), 41 deletions(-)

diff --git a/cpp/cmake_modules/FindArrow.cmake 
b/cpp/cmake_modules/FindArrow.cmake
index f4b0a81..3d5fd8e 100644
--- a/cpp/cmake_modules/FindArrow.cmake
+++ b/cpp/cmake_modules/FindArrow.cmake
@@ -79,6 +79,14 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python
   NO_DEFAULT_PATH)
 get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY)
 
+if (PYARROW_BUILD_FLIGHT)
+  find_library(ARROW_FLIGHT_LIB_PATH NAMES arrow_flight
+    PATHS
+    ${ARROW_SEARCH_LIB_PATH}
+    NO_DEFAULT_PATH)
+  get_filename_component(ARROW_FLIGHT_LIBS ${ARROW_FLIGHT_LIB_PATH} DIRECTORY)
+endif()
+
 if (MSVC)
   SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")
 
@@ -101,19 +109,25 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
   set(ARROW_FOUND TRUE)
   set(ARROW_LIB_NAME arrow)
   set(ARROW_PYTHON_LIB_NAME arrow_python)
+  set(ARROW_FLIGHT_LIB_NAME arrow_flight)
   if (MSVC)
     set(ARROW_STATIC_LIB 
${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
     set(ARROW_PYTHON_STATIC_LIB 
${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
+    set(ARROW_FLIGHT_STATIC_LIB 
${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
     set(ARROW_SHARED_LIB 
${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
     set(ARROW_PYTHON_SHARED_LIB 
${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+    set(ARROW_FLIGHT_SHARED_LIB 
${ARROW_FLIGHT_SHARED_LIBS}/${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
     set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib)
     set(ARROW_PYTHON_SHARED_IMP_LIB 
${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib)
+    set(ARROW_FLIGHT_SHARED_IMP_LIB 
${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}.lib)
   else()
     set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a)
     set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a)
+    set(ARROW_FLIGHT_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}.a)
 
     set(ARROW_SHARED_LIB 
${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
     set(ARROW_PYTHON_SHARED_LIB 
${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+    set(ARROW_FLIGHT_SHARED_LIB 
${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
   endif()
 endif()
 
diff --git a/cpp/src/arrow/flight/CMakeLists.txt 
b/cpp/src/arrow/flight/CMakeLists.txt
index 9183e26..a51f4fe 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -21,7 +21,7 @@ add_custom_target(arrow_flight)
 arrow_install_all_headers("arrow/flight")
 
 set(ARROW_FLIGHT_STATIC_LINK_LIBS
-    protobuf_static
+    ${PROTOBUF_LIBRARY}
     grpc_grpcpp_static
     grpc_grpc_static
     grpc_gpr_static
diff --git a/cpp/src/arrow/python/CMakeLists.txt 
b/cpp/src/arrow/python/CMakeLists.txt
index 93dbd66..9cf7eeb 100644
--- a/cpp/src/arrow/python/CMakeLists.txt
+++ b/cpp/src/arrow/python/CMakeLists.txt
@@ -44,12 +44,25 @@ set(ARROW_PYTHON_SRCS
     pyarrow.cc
     serialize.cc)
 
+if(ARROW_FLIGHT)
+  set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc)
+endif()
+
 if("${COMPILER_FAMILY}" STREQUAL "clang")
   set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS " 
-Wno-cast-qual ")
 endif()
 
 set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS})
 
+if(ARROW_FLIGHT)
+  # Must link shared: we don't want to link more than one copy of gRPC
+  # into the eventual Cython shared object, otherwise gRPC calls fail
+  # with weird errors due to multiple copies of global static state
+  # (The other solution is to link gRPC shared everywhere instead of
+  # statically only in Flight)
+  set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} 
arrow_flight_shared)
+endif()
+
 if(WIN32)
   set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} 
${PYTHON_LIBRARIES})
 endif()
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
new file mode 100644
index 0000000..5550333
--- /dev/null
+++ b/cpp/src/arrow/python/flight.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <utility>
+
+#include "arrow/flight/internal.h"
+#include "arrow/python/flight.h"
+
+namespace arrow {
+namespace py {
+namespace flight {
+
+PyFlightServer::PyFlightServer(PyObject* server, PyFlightServerVtable vtable)
+    : vtable_(vtable) {
+  Py_INCREF(server);
+  server_.reset(server);
+}
+
+Status PyFlightServer::ListFlights(
+    const arrow::flight::Criteria* criteria,
+    std::unique_ptr<arrow::flight::FlightListing>* listings) {
+  return Status::NotImplemented("NYI");
+}
+
+Status PyFlightServer::GetFlightInfo(const arrow::flight::FlightDescriptor& 
request,
+                                     
std::unique_ptr<arrow::flight::FlightInfo>* info) {
+  PyAcquireGIL lock;
+  vtable_.get_flight_info(server_.obj(), request, info);
+  return CheckPyError();
+}
+
+Status PyFlightServer::DoGet(const arrow::flight::Ticket& request,
+                             std::unique_ptr<arrow::flight::FlightDataStream>* 
stream) {
+  PyAcquireGIL lock;
+  vtable_.do_get(server_.obj(), request, stream);
+  return CheckPyError();
+}
+
+Status 
PyFlightServer::DoPut(std::unique_ptr<arrow::flight::FlightMessageReader> 
reader) {
+  PyAcquireGIL lock;
+  vtable_.do_put(server_.obj(), std::move(reader));
+  return CheckPyError();
+}
+
+Status PyFlightServer::DoAction(const arrow::flight::Action& action,
+                                std::unique_ptr<arrow::flight::ResultStream>* 
result) {
+  return Status::NotImplemented("NYI");
+}
+
+Status PyFlightServer::ListActions(std::vector<arrow::flight::ActionType>* 
actions) {
+  return Status::NotImplemented("NYI");
+}
+
+Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
+                        const arrow::flight::FlightDescriptor& descriptor,
+                        const std::vector<arrow::flight::FlightEndpoint>& 
endpoints,
+                        uint64_t total_records, uint64_t total_bytes,
+                        std::unique_ptr<arrow::flight::FlightInfo>* out) {
+  arrow::flight::FlightInfo::Data flight_data;
+  RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema, 
&flight_data.schema));
+  flight_data.descriptor = descriptor;
+  flight_data.endpoints = endpoints;
+  flight_data.total_records = total_records;
+  flight_data.total_bytes = total_bytes;
+  arrow::flight::FlightInfo value(flight_data);
+  *out = std::unique_ptr<arrow::flight::FlightInfo>(new 
arrow::flight::FlightInfo(value));
+  return Status::OK();
+}
+
+}  // namespace flight
+}  // namespace py
+}  // namespace arrow
diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h
new file mode 100644
index 0000000..774cf70
--- /dev/null
+++ b/cpp/src/arrow/python/flight.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PYARROW_FLIGHT_H
+#define PYARROW_FLIGHT_H
+
+#include <memory>
+#include <vector>
+
+#include "arrow/flight/api.h"
+#include "arrow/python/common.h"
+#include "arrow/python/config.h"
+
+namespace arrow {
+
+namespace py {
+
+namespace flight {
+
+/// \brief A table of function pointers for calling from C++ into
+/// Python.
+class ARROW_PYTHON_EXPORT PyFlightServerVtable {
+ public:
+  std::function<void(PyObject*, const arrow::flight::FlightDescriptor&,
+                     std::unique_ptr<arrow::flight::FlightInfo>*)>
+      get_flight_info;
+  std::function<void(PyObject*, 
std::unique_ptr<arrow::flight::FlightMessageReader>)>
+      do_put;
+  std::function<void(PyObject*, const arrow::flight::Ticket&,
+                     std::unique_ptr<arrow::flight::FlightDataStream>*)>
+      do_get;
+};
+
+class ARROW_PYTHON_EXPORT PyFlightServer : public 
arrow::flight::FlightServerBase {
+ public:
+  explicit PyFlightServer(PyObject* server, PyFlightServerVtable vtable);
+
+  Status ListFlights(const arrow::flight::Criteria* criteria,
+                     std::unique_ptr<arrow::flight::FlightListing>* listings) 
override;
+  Status GetFlightInfo(const arrow::flight::FlightDescriptor& request,
+                       std::unique_ptr<arrow::flight::FlightInfo>* info) 
override;
+  Status DoGet(const arrow::flight::Ticket& request,
+               std::unique_ptr<arrow::flight::FlightDataStream>* stream) 
override;
+  Status DoPut(std::unique_ptr<arrow::flight::FlightMessageReader> reader) 
override;
+  Status DoAction(const arrow::flight::Action& action,
+                  std::unique_ptr<arrow::flight::ResultStream>* result) 
override;
+  Status ListActions(std::vector<arrow::flight::ActionType>* actions) override;
+
+ private:
+  OwnedRefNoGIL server_;
+  PyFlightServerVtable vtable_;
+};
+
+ARROW_PYTHON_EXPORT
+Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
+                        const arrow::flight::FlightDescriptor& descriptor,
+                        const std::vector<arrow::flight::FlightEndpoint>& 
endpoints,
+                        uint64_t total_records, uint64_t total_bytes,
+                        std::unique_ptr<arrow::flight::FlightInfo>* out);
+
+}  // namespace flight
+}  // namespace py
+}  // namespace arrow
+
+#endif  // PYARROW_FLIGHT_H
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 0559261..63a8cd0 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -59,6 +59,7 @@ endif()
 # Top level cmake dir
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
+  option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
   option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF)
   option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF)
   option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where 
relevant" ON)
@@ -191,6 +192,10 @@ include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS} 
${PYTHON_INCLUDE_DIRS} src)
 # Dependencies
 #
 
+if(PYARROW_BUILD_FLIGHT)
+  set(ARROW_FLIGHT TRUE)
+endif()
+
 # Arrow
 find_package(Arrow REQUIRED)
 include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
@@ -352,9 +357,15 @@ endif()
 if(MSVC)
   add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB})
   add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB})
+  if(PYARROW_BUILD_FLIGHT)
+    add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB})
+  endif()
 else()
   add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB})
   add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB})
+  if(PYARROW_BUILD_FLIGHT)
+    add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB})
+  endif()
 endif()
 
 #
@@ -474,6 +485,20 @@ if(PYARROW_BUILD_ORC)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc)
 endif()
 
+# Flight
+if(PYARROW_BUILD_FLIGHT)
+  if(PYARROW_BUNDLE_ARROW_CPP)
+    # TODO:
+    message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in 
pyarrow")
+  endif()
+  # We do NOT want to link gRPC or any other Flight dependency
+  # here. Linking more than one copy leads to odd runtime errors due
+  # to multiple copies of static global state. Thus we also need to
+  # link Flight as a shared object.
+  set(LINK_LIBS ${LINK_LIBS} arrow_flight_shared)
+  set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
+endif()
+
 # Gandiva
 if(PYARROW_BUILD_GANDIVA)
   find_package(Gandiva)
diff --git a/python/examples/flight/client.py b/python/examples/flight/client.py
new file mode 100644
index 0000000..df9acc1
--- /dev/null
+++ b/python/examples/flight/client.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight CLI client."""
+
+import argparse
+import sys
+
+import pyarrow
+import pyarrow.flight
+
+
+def list_flights(args, client):
+    print('Flights\n=======')
+    for flight in client.list_flights():
+        descriptor = flight.descriptor
+        if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
+            print("Path:", descriptor.path)
+        elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
+            print("Command:", descriptor.command)
+        else:
+            print("Unknown descriptor type")
+
+        print("Total records:", end=" ")
+        if flight.total_records >= 0:
+            print(flight.total_records)
+        else:
+            print("Unknown")
+
+        print("Total bytes:", end=" ")
+        if flight.total_bytes >= 0:
+            print(flight.total_bytes)
+        else:
+            print("Unknown")
+
+        print("Number of endpoints:", len(flight.endpoints))
+
+        if args.list:
+            print(flight.schema)
+
+        print('---')
+
+    print('\nActions\n=======')
+    for action in client.list_actions():
+        print("Type:", action.type)
+        print("Description:", action.description)
+        print('---')
+
+
+def do_action(args, client):
+    try:
+        buf = pyarrow.allocate_buffer(0)
+        action = pyarrow.flight.Action(args.action_type, buf)
+        print('Running action', args.action_type)
+        for result in client.do_action(action):
+            print("Got result", result.body.to_pybytes())
+    except pyarrow.lib.ArrowIOError as e:
+        print("Error calling action:", e)
+
+
+def get_flight(args, client):
+    if args.path:
+        descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
+    else:
+        descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)
+
+    info = client.get_flight_info(descriptor)
+    for endpoint in info.endpoints:
+        print('Ticket:', endpoint.ticket)
+        for location in endpoint.locations:
+            print(location)
+            get_client = pyarrow.flight.FlightClient.connect(location)
+            reader = get_client.do_get(endpoint.ticket, info.schema)
+            df = reader.read_pandas()
+            print(df)
+
+
+def _add_common_arguments(parser):
+    parser.add_argument('host', type=str,
+                        help="The host to connect to.")
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    subcommands = parser.add_subparsers()
+
+    cmd_list = subcommands.add_parser('list')
+    cmd_list.set_defaults(action='list')
+    _add_common_arguments(cmd_list)
+    cmd_list.add_argument('-l', '--list', action='store_true',
+                          help="Print more details.")
+
+    cmd_do = subcommands.add_parser('do')
+    cmd_do.set_defaults(action='do')
+    _add_common_arguments(cmd_do)
+    cmd_do.add_argument('action_type', type=str,
+                        help="The action type to run.")
+
+    cmd_get = subcommands.add_parser('get')
+    cmd_get.set_defaults(action='get')
+    _add_common_arguments(cmd_get)
+    cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True)
+    cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append',
+                                    help="The path for the descriptor.")
+    cmd_get_descriptor.add_argument('-c', '--command', type=str,
+                                    help="The command for the descriptor.")
+
+    args = parser.parse_args()
+    if not hasattr(args, 'action'):
+        parser.print_help()
+        sys.exit(1)
+
+    commands = {
+        'list': list_flights,
+        'do': do_action,
+        'get': get_flight,
+    }
+    host, port = args.host.split(':')
+    port = int(port)
+    client = pyarrow.flight.FlightClient.connect(host, port)
+    commands[args.action](args, client)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/examples/flight/server.py b/python/examples/flight/server.py
new file mode 100644
index 0000000..cf8668a
--- /dev/null
+++ b/python/examples/flight/server.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight Python server."""
+
+import pyarrow
+import pyarrow.flight
+
+
+class FlightServer(pyarrow.flight.FlightServerBase):
+    def __init__(self):
+        super(FlightServer, self).__init__()
+        self.flights = {}
+
+    @classmethod
+    def descriptor_to_key(self, descriptor):
+        return (descriptor.descriptor_type, descriptor.command,
+                tuple(descriptor.path or tuple()))
+
+    def get_flight_info(self, descriptor):
+        key = FlightServer.descriptor_to_key(descriptor)
+        if key in self.flights:
+            table = self.flights[key]
+            return pyarrow.flight.FlightInfo(table.schema,
+                                             descriptor, [],
+                                             table.num_rows, 0)
+        raise KeyError('Flight not found.')
+
+    def do_put(self, descriptor, reader):
+        key = FlightServer.descriptor_to_key(descriptor)
+        print(key)
+        self.flights[key] = reader.read_all()
+        print(self.flights[key])
+
+
+def main():
+    server = FlightServer()
+    server.run(5005)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
new file mode 100644
index 0000000..4819171
--- /dev/null
+++ b/python/pyarrow/_flight.pyx
@@ -0,0 +1,476 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import collections
+import enum
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.compat import frombytes, tobytes
+from pyarrow.lib cimport *
+from pyarrow.lib import as_buffer
+from pyarrow.includes.libarrow_flight cimport *
+from pyarrow.ipc import _ReadPandasOption
+import pyarrow.lib as lib
+
+
+cdef class Action:
+    """An action executable on a Flight service."""
+    cdef:
+        CAction action
+
+    def __init__(self, action_type, buf):
+        self.action.type = tobytes(action_type)
+        self.action.body = pyarrow_unwrap_buffer(as_buffer(buf))
+
+    @property
+    def type(self):
+        return frombytes(self.action.type)
+
+    def body(self):
+        return pyarrow_wrap_buffer(self.action.body)
+
+
+cdef class ActionType:
+    """A type of action executable on a Flight service."""
+    cdef:
+        CActionType action_type
+
+    @property
+    def type(self):
+        return frombytes(self.action_type.type)
+
+    @property
+    def description(self):
+        return frombytes(self.action_type.description)
+
+    def make_action(self, buf):
+        """Create an Action with this type."""
+        return Action(self.type, buf)
+
+    def __repr__(self):
+        return '<ActionType type={} description={}>'.format(
+            self.type, self.description)
+
+
+cdef class Result:
+    """A result from executing an Action."""
+    cdef:
+        unique_ptr[CResult] result
+
+    @property
+    def body(self):
+        """Get the Buffer containing the result."""
+        return pyarrow_wrap_buffer(self.result.get().body)
+
+
+class DescriptorType(enum.Enum):
+    UNKNOWN = 0
+    PATH = 1
+    CMD = 2
+
+
+cdef class FlightDescriptor:
+    """A description of a data stream available from a Flight service."""
+    cdef:
+        CFlightDescriptor descriptor
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly, use "
+                        "`pyarrow.flight.FlightDescriptor.for_{path,command}` "
+                        "function instead."
+                        .format(self.__class__.__name__))
+
+    @staticmethod
+    def for_path(*path):
+        """Create a FlightDescriptor for a resource path."""
+        cdef FlightDescriptor result = \
+            FlightDescriptor.__new__(FlightDescriptor)
+        result.descriptor.type = CDescriptorTypePath
+        result.descriptor.path = [tobytes(p) for p in path]
+        return result
+
+    @staticmethod
+    def for_command(command):
+        """Create a FlightDescriptor for an opaque command."""
+        cdef FlightDescriptor result = \
+            FlightDescriptor.__new__(FlightDescriptor)
+        result.descriptor.type = CDescriptorTypeCmd
+        result.descriptor.cmd = tobytes(command)
+        return result
+
+    @property
+    def descriptor_type(self):
+        if self.descriptor.type == CDescriptorTypeUnknown:
+            return DescriptorType.UNKNOWN
+        elif self.descriptor.type == CDescriptorTypePath:
+            return DescriptorType.PATH
+        elif self.descriptor.type == CDescriptorTypeCmd:
+            return DescriptorType.CMD
+        raise RuntimeError("Invalid descriptor type!")
+
+    @property
+    def command(self):
+        """Get the command for this descriptor."""
+        if self.descriptor_type != DescriptorType.CMD:
+            return None
+        return self.descriptor.cmd
+
+    @property
+    def path(self):
+        """Get the path for this descriptor."""
+        if self.descriptor_type != DescriptorType.PATH:
+            return None
+        return self.descriptor.path
+
+    def __repr__(self):
+        return "<FlightDescriptor type: {!r}>".format(self.descriptor_type())
+
+
+class Ticket:
+    """A ticket for requesting a Flight stream."""
+    def __init__(self, ticket):
+        self.ticket = ticket
+
+    def __repr__(self):
+        return '<Ticket {}>'.format(self.ticket)
+
+
+class Location(collections.namedtuple('Location', ['host', 'port'])):
+    """A location where a Flight stream is available."""
+
+
+cdef class FlightEndpoint:
+    """A Flight stream, along with the ticket and locations to access it."""
+    cdef:
+        CFlightEndpoint endpoint
+
+    def __init__(self, ticket, locations):
+        """Create a FlightEndpoint from a ticket and list of locations.
+
+        Parameters
+        ----------
+        ticket : Ticket or bytes
+            the ticket needed to access this flight
+        locations : list of Location or tuples of (host, port)
+            locations where this flight is available
+        """
+        cdef:
+            CLocation c_location = CLocation()
+
+        if isinstance(ticket, Ticket):
+            self.endpoint.ticket.ticket = ticket.ticket
+        else:
+            self.endpoint.ticket.ticket = ticket
+
+        for location in locations:
+            # Accepts Location namedtuple or tuple
+            c_location.host = tobytes(location[0])
+            c_location.port = location[1]
+            self.endpoint.locations.push_back(c_location)
+
+    @property
+    def ticket(self):
+        return Ticket(self.endpoint.ticket.ticket)
+
+    @property
+    def locations(self):
+        return [Location(frombytes(location.host), location.port)
+                for location in self.endpoint.locations]
+
+
+cdef class FlightInfo:
+    """A description of a Flight stream."""
+    cdef:
+        unique_ptr[CFlightInfo] info
+
+    def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
+                 total_records, total_bytes):
+        """Create a FlightInfo object from a schema, descriptor, and endpoints.
+
+        Parameters
+        ----------
+        schema : Schema
+            the schema of the data in this flight.
+        descriptor : FlightDescriptor
+            the descriptor for this flight.
+        endpoints : list of FlightEndpoint
+            a list of endpoints where this flight is available.
+        total_records : int
+            the total records in this flight, or -1 if unknown
+        total_bytes : int
+            the total bytes in this flight, or -1 if unknown
+        """
+        cdef:
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            vector[CFlightEndpoint] c_endpoints
+
+        for endpoint in endpoints:
+            if isinstance(endpoint, FlightEndpoint):
+                c_endpoints.push_back((<FlightEndpoint> endpoint).endpoint)
+            else:
+                raise TypeError('Endpoint {} is not instance of'
+                                ' FlightEndpoint'.format(endpoint))
+
+        check_status(CreateFlightInfo(c_schema,
+                                      descriptor.descriptor,
+                                      c_endpoints,
+                                      total_records,
+                                      total_bytes, &self.info))
+
+    @property
+    def total_records(self):
+        """The total record count of this flight, or -1 if unknown."""
+        return self.info.get().total_records()
+
+    @property
+    def total_bytes(self):
+        """The size in bytes of the data in this flight, or -1 if unknown."""
+        return self.info.get().total_bytes()
+
+    @property
+    def schema(self):
+        """The schema of the data in this flight."""
+        cdef:
+            shared_ptr[CSchema] schema
+        check_status(self.info.get().GetSchema(&schema))
+        return pyarrow_wrap_schema(schema)
+
+    @property
+    def descriptor(self):
+        """The descriptor of the data in this flight."""
+        cdef FlightDescriptor result = \
+            FlightDescriptor.__new__(FlightDescriptor)
+        result.descriptor = self.info.get().descriptor()
+        return result
+
+    @property
+    def endpoints(self):
+        """The endpoints where this flight is available."""
+        # TODO: get Cython to iterate over reference directly
+        cdef:
+            vector[CFlightEndpoint] endpoints = self.info.get().endpoints()
+            FlightEndpoint py_endpoint
+
+        result = []
+        for endpoint in endpoints:
+            py_endpoint = FlightEndpoint.__new__()
+            py_endpoint.endpoint = endpoint
+            result.append(py_endpoint)
+        return result
+
+
+cdef class FlightRecordBatchReader(_CRecordBatchReader, _ReadPandasOption):
+    cdef dict __dict__
+
+
+cdef class FlightRecordBatchWriter(_CRecordBatchWriter):
+    pass
+
+
+cdef class FlightClient:
+    """A client to a Flight service."""
+    cdef:
+        unique_ptr[CFlightClient] client
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly, use "
+                        "`pyarrow.flight.FlightClient.connect` instead."
+                        .format(self.__class__.__name__))
+
+    @staticmethod
+    def connect(*args):
+        """Connect to a Flight service on the given host and port."""
+        cdef:
+            FlightClient result = FlightClient.__new__(FlightClient)
+            int c_port = 0
+            c_string c_host
+
+        if len(args) == 1:
+            # Accept namedtuple or plain tuple
+            c_host = tobytes(args[0][0])
+            c_port = args[0][1]
+        elif len(args) == 2:
+            # Accept separate host, port
+            c_host = tobytes(args[0])
+            c_port = args[1]
+        else:
+            raise TypeError("FlightClient.connect() takes 1 "
+                            "or 2 arguments ({} given)".format(len(args)))
+
+        with nogil:
+            check_status(CFlightClient.Connect(c_host, c_port, &result.client))
+
+        return result
+
+    def list_actions(self):
+        """List the actions available on a service."""
+        cdef:
+            vector[CActionType] results
+
+        with nogil:
+            check_status(self.client.get().ListActions(&results))
+
+        result = []
+        for action_type in results:
+            py_action = ActionType()
+            py_action.action_type = action_type
+            result.append(py_action)
+
+        return result
+
+    def do_action(self, action: Action):
+        """Execute an action on a service."""
+        cdef:
+            unique_ptr[CResultStream] results
+        with nogil:
+            check_status(self.client.get().DoAction(action.action, &results))
+
+        while True:
+            result = Result()
+            with nogil:
+                check_status(results.get().Next(&result.result))
+                if result.result == NULL:
+                    break
+            yield result
+
+    def list_flights(self):
+        """List the flights available on a service."""
+        cdef:
+            unique_ptr[CFlightListing] listing
+            FlightInfo result
+
+        with nogil:
+            check_status(self.client.get().ListFlights(&listing))
+
+        while True:
+            result = FlightInfo.__new__(FlightInfo)
+            with nogil:
+                check_status(listing.get().Next(&result.info))
+                if result.info == NULL:
+                    break
+            yield result
+
+    def get_flight_info(self, descriptor: FlightDescriptor):
+        """Request information about an available flight."""
+        cdef:
+            FlightInfo result = FlightInfo.__new__(FlightInfo)
+
+        with nogil:
+            check_status(self.client.get().GetFlightInfo(
+                descriptor.descriptor, &result.info))
+
+        return result
+
+    def do_get(self, ticket: Ticket, schema: Schema):
+        """Request the data for a flight."""
+        cdef:
+            # TODO: introduce unwrap
+            CTicket c_ticket
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            unique_ptr[CRecordBatchReader] reader
+
+        c_ticket.ticket = ticket.ticket
+        with nogil:
+            check_status(self.client.get().DoGet(c_ticket, c_schema, &reader))
+        result = FlightRecordBatchReader()
+        result.reader.reset(reader.release())
+        return result
+
+    def do_put(self, descriptor: FlightDescriptor, schema: Schema):
+        """Upload data to a flight."""
+        cdef:
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            unique_ptr[CRecordBatchWriter] writer
+
+        with nogil:
+            check_status(self.client.get().DoPut(
+                descriptor.descriptor, c_schema, &writer))
+        result = FlightRecordBatchWriter()
+        result.writer.reset(writer.release())
+        return result
+
+
+cdef class FlightDataStream:
+    cdef:
+        unique_ptr[CFlightDataStream] stream
+
+
+cdef class RecordBatchStream(FlightDataStream):
+    def __init__(self, reader):
+        # TODO: we don't really expose the readers in Python.
+        pass
+
+
+cdef void _get_flight_info(void* self, CFlightDescriptor c_descriptor,
+                           unique_ptr[CFlightInfo]* info):
+    """Callback for implementing Flight servers in Python."""
+    raise NotImplementedError("GetFlightInfo is not implemented")
+
+
+cdef void _do_put(void* self, unique_ptr[CFlightMessageReader] reader):
+    """Callback for implementing Flight servers in Python."""
+    cdef:
+        FlightRecordBatchReader py_reader = FlightRecordBatchReader()
+        FlightDescriptor descriptor = \
+            FlightDescriptor.__new__(FlightDescriptor)
+
+    descriptor.descriptor = reader.get().descriptor()
+    py_reader.reader.reset(reader.release())
+    (<object> self).do_put(descriptor, py_reader)
+
+
+cdef void _do_get(void* self, CTicket ticket,
+                  unique_ptr[CFlightDataStream]* stream):
+    """Callback for implementing Flight servers in Python."""
+    py_ticket = Ticket()
+    py_ticket.ticket = ticket.ticket
+    result = (<object> self).do_get(py_ticket)
+    if not isinstance(result, FlightDataStream):
+        raise TypeError("FlightServerBase.do_get must return "
+                        "a FlightDataStream")
+    stream[0] = move((<FlightDataStream> result).stream)
+
+
+cdef class FlightServerBase:
+    """A Flight service definition."""
+
+    cdef:
+        unique_ptr[PyFlightServer] server
+
+    def run(self, port):
+        cdef:
+            PyFlightServerVtable vtable = PyFlightServerVtable()
+            int c_port = port
+        vtable.get_flight_info = &_get_flight_info
+        vtable.do_put = &_do_put
+        vtable.do_get = &_do_get
+        self.server.reset(new PyFlightServer(self, vtable))
+        with nogil:
+            self.server.get().Run(c_port)
+
+    def get_flight_info(self, descriptor):
+        raise NotImplementedError
+
+    def do_put(self, descriptor, reader):
+        raise NotImplementedError
+
+    def shutdown(self):
+        if self.server.get() != NULL:
+            self.server.get().Shutdown()
diff --git a/python/pyarrow/flight.py b/python/pyarrow/flight.py
new file mode 100644
index 0000000..d238c37
--- /dev/null
+++ b/python/pyarrow/flight.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._flight import (FlightClient, Action, ActionType,  # noqa
+                             FlightDescriptor, FlightInfo, Ticket, Location,
+                             FlightServerBase, DescriptorType)
diff --git a/python/pyarrow/includes/common.pxd 
b/python/pyarrow/includes/common.pxd
index 1b13ff0..97e23f9 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -41,8 +41,12 @@ cdef extern from "numpy/halffloat.h":
 
 cdef extern from "arrow/api.h" namespace "arrow" nogil:
     # We can later add more of the common status factory methods as needed
-    cdef CStatus CStatus_OK "Status::OK"()
-    cdef CStatus CStatus_Invalid "Status::Invalid"()
+    cdef CStatus CStatus_OK "arrow::Status::OK"()
+    cdef CStatus CStatus_Invalid "arrow::Status::Invalid"()
+    cdef CStatus CStatus_NotImplemented \
+        "arrow::Status::NotImplemented"(const c_string& msg)
+    cdef CStatus CStatus_UnknownError \
+        "arrow::Status::UnknownError"(const c_string& msg)
 
     cdef cppclass CStatus "arrow::Status":
         CStatus()
diff --git a/python/pyarrow/includes/libarrow_flight.pxd 
b/python/pyarrow/includes/libarrow_flight.pxd
new file mode 100644
index 0000000..4d64f52
--- /dev/null
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from libcpp.functional cimport function
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
+    cdef cppclass CActionType" arrow::flight::ActionType":
+        c_string type
+        c_string description
+
+    cdef cppclass CAction" arrow::flight::Action":
+        c_string type
+        shared_ptr[CBuffer] body
+
+    cdef cppclass CResult" arrow::flight::Result":
+        shared_ptr[CBuffer] body
+
+    cdef cppclass CResultStream" arrow::flight::ResultStream":
+        CStatus Next(unique_ptr[CResult]* result)
+
+    cdef cppclass CDescriptorType \
+            " arrow::flight::FlightDescriptor::DescriptorType":
+        bint operator==(CDescriptorType)
+
+    CDescriptorType CDescriptorTypeUnknown\
+        " arrow::flight::FlightDescriptor::UNKNOWN"
+    CDescriptorType CDescriptorTypePath\
+        " arrow::flight::FlightDescriptor::PATH"
+    CDescriptorType CDescriptorTypeCmd\
+        " arrow::flight::FlightDescriptor::CMD"
+
+    cdef cppclass CFlightDescriptor" arrow::flight::FlightDescriptor":
+        CDescriptorType type
+        c_string cmd
+        vector[c_string] path
+
+    cdef cppclass CTicket" arrow::flight::Ticket":
+        CTicket()
+        c_string ticket
+
+    cdef cppclass CLocation" arrow::flight::Location":
+        CLocation()
+
+        c_string host
+        int32_t port
+
+    cdef cppclass CFlightEndpoint" arrow::flight::FlightEndpoint":
+        CFlightEndpoint()
+
+        CTicket ticket
+        vector[CLocation] locations
+
+    cdef cppclass CFlightInfo" arrow::flight::FlightInfo":
+        uint64_t total_records()
+        uint64_t total_bytes()
+        CStatus GetSchema(shared_ptr[CSchema]* out)
+        CFlightDescriptor& descriptor()
+        const vector[CFlightEndpoint]& endpoints()
+
+    cdef cppclass CFlightListing" arrow::flight::FlightListing":
+        CStatus Next(unique_ptr[CFlightInfo]* info)
+
+    cdef cppclass CFlightMessageReader \
+            " arrow::flight::FlightMessageReader"(CRecordBatchReader):
+        CFlightDescriptor& descriptor()
+
+    cdef cppclass CFlightDataStream" arrow::flight::FlightDataStream":
+        pass
+
+    cdef cppclass CRecordBatchStream \
+            " arrow::flight::RecordBatchStream"(CFlightDataStream):
+        CRecordBatchStream(shared_ptr[CRecordBatchReader]& reader)
+
+    cdef cppclass CFlightClient" arrow::flight::FlightClient":
+        @staticmethod
+        CStatus Connect(const c_string& host, int port,
+                        unique_ptr[CFlightClient]* client)
+
+        CStatus DoAction(CAction& action, unique_ptr[CResultStream]* results)
+        CStatus ListActions(vector[CActionType]* actions)
+
+        CStatus ListFlights(unique_ptr[CFlightListing]* listing)
+        CStatus GetFlightInfo(CFlightDescriptor& descriptor,
+                              unique_ptr[CFlightInfo]* info)
+
+        CStatus DoGet(CTicket& ticket, shared_ptr[CSchema]& schema,
+                      unique_ptr[CRecordBatchReader]* stream)
+        CStatus DoPut(CFlightDescriptor& descriptor,
+                      shared_ptr[CSchema]& schema,
+                      unique_ptr[CRecordBatchWriter]* stream)
+
+
+# Callbacks for implementing Flight servers
+# Use typedef to emulate syntax for std::function<void(...)>
+ctypedef void cb_get_flight_info(object, const CFlightDescriptor&,
+                                 unique_ptr[CFlightInfo]*)
+ctypedef void cb_do_put(object, unique_ptr[CFlightMessageReader])
+ctypedef void cb_do_get(object, const CTicket&,
+                        unique_ptr[CFlightDataStream]*)
+
+cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil:
+    cdef cppclass PyFlightServerVtable:
+        PyFlightServerVtable()
+        function[cb_get_flight_info] get_flight_info
+        function[cb_do_put] do_put
+        function[cb_do_get] do_get
+
+    cdef cppclass PyFlightServer:
+        PyFlightServer(object server, PyFlightServerVtable vtable)
+        void Run(int port)
+        void Shutdown()
+
+    cdef CStatus CreateFlightInfo" arrow::py::flight::CreateFlightInfo"(
+        shared_ptr[CSchema] schema,
+        CFlightDescriptor& descriptor,
+        vector[CFlightEndpoint] endpoints,
+        uint64_t total_records,
+        uint64_t total_bytes,
+        unique_ptr[CFlightInfo]* out)
+
+cdef extern from "<utility>" namespace "std":
+    unique_ptr[CFlightDataStream] move(unique_ptr[CFlightDataStream])
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 137d526..e857302 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -148,26 +148,14 @@ cdef class MessageReader:
 # ----------------------------------------------------------------------
 # File and stream readers and writers
 
-cdef class _RecordBatchWriter:
-    cdef:
-        shared_ptr[CRecordBatchWriter] writer
-        shared_ptr[OutputStream] sink
-        bint closed
-
-    def __cinit__(self):
-        pass
-
-    def __dealloc__(self):
-        pass
+cdef class _CRecordBatchWriter:
+    """The base RecordBatchWriter wrapper.
 
-    def _open(self, sink, Schema schema):
-        get_writer(sink, &self.sink)
+    Provides common implementations of convenience methods. Should not
+    be instantiated directly by user code.
+    """
 
-        with nogil:
-            check_status(
-                CRecordBatchStreamWriter.Open(self.sink.get(),
-                                              schema.sp_schema,
-                                              &self.writer))
+    # cdef block is in lib.pxd
 
     def write(self, table_or_batch):
         """
@@ -222,6 +210,33 @@ cdef class _RecordBatchWriter:
         with nogil:
             check_status(self.writer.get().Close())
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+
+cdef class _RecordBatchStreamWriter(_CRecordBatchWriter):
+    cdef:
+        shared_ptr[OutputStream] sink
+        bint closed
+
+    def __cinit__(self):
+        pass
+
+    def __dealloc__(self):
+        pass
+
+    def _open(self, sink, Schema schema):
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(
+                CRecordBatchStreamWriter.Open(self.sink.get(),
+                                              schema.sp_schema,
+                                              &self.writer))
+
 
 cdef _get_input_stream(object source, shared_ptr[InputStream]* out):
     cdef:
@@ -237,24 +252,14 @@ cdef _get_input_stream(object source, 
shared_ptr[InputStream]* out):
     out[0] = <shared_ptr[InputStream]> file_handle
 
 
-cdef class _RecordBatchReader:
-    cdef:
-        shared_ptr[CRecordBatchReader] reader
-        shared_ptr[InputStream] in_stream
-
-    cdef readonly:
-        Schema schema
-
-    def __cinit__(self):
-        pass
+cdef class _CRecordBatchReader:
+    """The base RecordBatchReader wrapper.
 
-    def _open(self, source):
-        _get_input_stream(source, &self.in_stream)
-        with nogil:
-            check_status(CRecordBatchStreamReader.Open(
-                self.in_stream.get(), &self.reader))
+    Provides common implementations of convenience methods. Should not
+    be instantiated directly by user code.
+    """
 
-        self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+    # cdef block is in lib.pxd
 
     def __iter__(self):
         while True:
@@ -291,7 +296,26 @@ cdef class _RecordBatchReader:
         return pyarrow_wrap_table(table)
 
 
-cdef class _RecordBatchFileWriter(_RecordBatchWriter):
+cdef class _RecordBatchStreamReader(_CRecordBatchReader):
+    cdef:
+        shared_ptr[InputStream] in_stream
+
+    cdef readonly:
+        Schema schema
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source):
+        _get_input_stream(source, &self.in_stream)
+        with nogil:
+            check_status(CRecordBatchStreamReader.Open(
+                self.in_stream.get(), &self.reader))
+
+        self.schema = pyarrow_wrap_schema(self.reader.get().schema())
+
+
+cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
 
     def _open(self, sink, Schema schema):
         get_writer(sink, &self.sink)
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index a79cafe..78bb347 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -45,7 +45,7 @@ class _ReadPandasOption(object):
         return table.to_pandas(**options)
 
 
-class RecordBatchStreamReader(lib._RecordBatchReader, _ReadPandasOption):
+class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption):
     """
     Reader for the Arrow streaming binary format
 
@@ -58,7 +58,7 @@ class RecordBatchStreamReader(lib._RecordBatchReader, 
_ReadPandasOption):
         self._open(source)
 
 
-class RecordBatchStreamWriter(lib._RecordBatchWriter):
+class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
     """
     Writer for the Arrow streaming binary format
 
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 8cd8f40..6f14e76 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -383,6 +383,16 @@ cdef class NativeFile:
     cdef shared_ptr[OutputStream] get_output_stream(self) except *
 
 
+cdef class _CRecordBatchWriter:
+    cdef:
+        shared_ptr[CRecordBatchWriter] writer
+
+
+cdef class _CRecordBatchReader:
+    cdef:
+        shared_ptr[CRecordBatchReader] reader
+
+
 cdef get_input_stream(object source, c_bool use_memory_map,
                       shared_ptr[InputStream]* reader)
 cdef get_reader(object source, c_bool use_memory_map,
diff --git a/python/requirements.txt b/python/requirements.txt
index 3a23d1d..ba67f6b 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,3 +1,4 @@
 six>=1.0.0
 numpy>=1.14
 futures; python_version < "3.2"
+enum34 >= 1.1.6; python_version < "3.4"
diff --git a/python/setup.py b/python/setup.py
index e4a7936..0fc89c0 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -99,6 +99,7 @@ class build_ext(_build_ext):
                      ('boost-namespace=', None,
                       'namespace of boost (default: boost)'),
                      ('with-cuda', None, 'build the Cuda extension'),
+                     ('with-flight', None, 'build the Flight extension'),
                      ('with-parquet', None, 'build the Parquet extension'),
                      ('with-static-parquet', None, 'link parquet statically'),
                      ('with-static-boost', None, 'link boost statically'),
@@ -136,6 +137,8 @@ class build_ext(_build_ext):
 
         self.with_cuda = strtobool(
             os.environ.get('PYARROW_WITH_CUDA', '0'))
+        self.with_flight = strtobool(
+            os.environ.get('PYARROW_WITH_FLIGHT', '0'))
         self.with_parquet = strtobool(
             os.environ.get('PYARROW_WITH_PARQUET', '0'))
         self.with_static_parquet = strtobool(
@@ -161,6 +164,7 @@ class build_ext(_build_ext):
         'lib',
         '_csv',
         '_cuda',
+        '_flight',
         '_parquet',
         '_orc',
         '_plasma',
@@ -200,6 +204,8 @@ class build_ext(_build_ext):
                 cmake_options += ['-G', self.cmake_generator]
             if self.with_cuda:
                 cmake_options.append('-DPYARROW_BUILD_CUDA=on')
+            if self.with_flight:
+                cmake_options.append('-DPYARROW_BUILD_FLIGHT=on')
             if self.with_parquet:
                 cmake_options.append('-DPYARROW_BUILD_PARQUET=on')
             if self.with_static_parquet:
@@ -344,6 +350,8 @@ class build_ext(_build_ext):
                 move_shared_libs(build_prefix, build_lib, "arrow_python")
                 if self.with_cuda:
                     move_shared_libs(build_prefix, build_lib, "arrow_gpu")
+                if self.with_flight:
+                    move_shared_libs(build_prefix, build_lib, "arrow_flight")
                 if self.with_plasma:
                     move_shared_libs(build_prefix, build_lib, "plasma")
                 if self.with_gandiva:
@@ -384,6 +392,8 @@ class build_ext(_build_ext):
             return True
         if name == '_orc' and not self.with_orc:
             return True
+        if name == '_flight' and not self.with_flight:
+            return True
         if name == '_cuda' and not self.with_cuda:
             return True
         if name == 'gandiva' and not self.with_gandiva:
@@ -530,7 +540,8 @@ class BinaryDistribution(Distribution):
 install_requires = (
     'numpy >= 1.14',
     'six >= 1.0.0',
-    'futures; python_version < "3.2"'
+    'futures; python_version < "3.2"',
+    'enum34 >= 1.1.6; python_version < "3.4"',
 )
 
 

Reply via email to