Repository: arrow Updated Branches: refs/heads/master 7c1fef51c -> 6c352e205
ARROW-822: [Python] StreamWriter Wrapper for Socket and File-like Objects without tell() Added a wrapper for StreamWriter to implement the required tell() method so that python sockets and file-like objects can be used as sinks. The tell() method will report the position by starting at 0 when the StreamWriter is created and incrementing by number of bytes after each write. Added unittests that use local socket as the source/sink for streaming. Author: Bryan Cutler <[email protected]> Closes #569 from BryanCutler/pyarrow-stream-writer-socket-ARROW-822 and squashes the following commits: 6cdec4f [Bryan Cutler] Removed StreamWriter wrapper and put position handling in PyStreamWriter instead 2bd669f [Bryan Cutler] Added StreamSinkWrapper to ensure stream sink has tell() method, added unittest for StreamWriter and StreamReader over local socket Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6c352e20 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6c352e20 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6c352e20 Branch: refs/heads/master Commit: 6c352e2057d5f9a442c1ebf0d35c716f475fd343 Parents: 7c1fef5 Author: Bryan Cutler <[email protected]> Authored: Thu Apr 20 21:42:20 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Thu Apr 20 21:42:20 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/python/io.cc | 7 ++-- cpp/src/arrow/python/io.h | 1 + python/pyarrow/tests/test_ipc.py | 79 +++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/6c352e20/cpp/src/arrow/python/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index ba82a45..327e8fe 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -189,7 +189,7 @@ bool PyReadableFile::supports_zero_copy() const { // ---------------------------------------------------------------------- // Output stream -PyOutputStream::PyOutputStream(PyObject* file) { +PyOutputStream::PyOutputStream(PyObject* file) : position_(0) { file_.reset(new PythonFile(file)); } @@ -201,12 +201,13 @@ Status PyOutputStream::Close() { } Status PyOutputStream::Tell(int64_t* position) { - PyAcquireGIL lock; - return file_->Tell(position); + *position = position_; + return Status::OK(); } Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { PyAcquireGIL lock; + position_ += nbytes; return file_->Write(data, nbytes); } http://git-wip-us.apache.org/repos/asf/arrow/blob/6c352e20/cpp/src/arrow/python/io.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index bf14cd6..ebd4c5a 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -82,6 +82,7 @@ class ARROW_EXPORT PyOutputStream : public io::OutputStream { private: std::unique_ptr<PythonFile> file_; + int64_t position_; }; // A zero-copy reader backed by a PyBuffer object http://git-wip-us.apache.org/repos/asf/arrow/blob/6c352e20/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 31d418d..81213ed 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -17,6 +17,8 @@ import io import pytest +import socket +import threading import numpy as np @@ -126,6 +128,83 @@ class TestStream(MessagingTest, unittest.TestCase): assert result.equals(expected) +class TestSocket(MessagingTest, unittest.TestCase): + + class StreamReaderServer(threading.Thread): + + def init(self, do_read_all): + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.bind(('127.0.0.1', 0)) + self._sock.listen(1) + host, port = self._sock.getsockname() + self._do_read_all = do_read_all + self._schema = None + self._batches = [] + self._table = None + return port + + def run(self): + connection, client_address = self._sock.accept() + try: + source = connection.makefile(mode='rb') + reader = pa.StreamReader(source) + self._schema = reader.schema + if self._do_read_all: + self._table = reader.read_all() + else: + for i, batch in enumerate(reader): + self._batches.append(batch) + finally: + connection.close() + + def get_result(self): + return(self._schema, self._table if self._do_read_all else self._batches) + + def setUp(self): + # NOTE: must start and stop server in test + pass + + def start_server(self, do_read_all): + self._server = TestSocket.StreamReaderServer() + port = self._server.init(do_read_all) + self._server.start() + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect(('127.0.0.1', port)) + self.sink = self._get_sink() + + def stop_and_get_result(self): + import struct + self.sink.write(struct.pack('i', 0)) + self.sink.flush() + self._sock.close() + self._server.join() + return self._server.get_result() + + def _get_sink(self): + return self._sock.makefile(mode='wb') + + def _get_writer(self, sink, schema): + return pa.StreamWriter(sink, schema) + + def test_simple_roundtrip(self): + self.start_server(do_read_all=False) + writer_batches = self.write_batches() + reader_schema, reader_batches = self.stop_and_get_result() + + assert reader_schema.equals(writer_batches[0].schema) + assert len(reader_batches) == len(writer_batches) + for i, batch in enumerate(writer_batches): + assert reader_batches[i].equals(batch) + + def test_read_all(self): + self.start_server(do_read_all=True) + writer_batches = self.write_batches() + _, result = self.stop_and_get_result() + + expected = pa.Table.from_batches(writer_batches) + assert result.equals(expected) + + class TestInMemoryFile(TestFile): def _get_sink(self):
