[
https://issues.apache.org/jira/browse/ARROW-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425980#comment-16425980
]
ASF GitHub Bot commented on ARROW-2308:
---------------------------------------
pcmoritz closed pull request #1802: ARROW-2308: [Python] Make deserialized
numpy arrays 64-byte aligned.
URL: https://github.com/apache/arrow/pull/1802
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fd747563a..896221e71 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -28,6 +28,7 @@
#include "arrow/ipc/Message_generated.h"
#include "arrow/ipc/Schema_generated.h"
#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
@@ -207,7 +208,8 @@ Status ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile
return Message::ReadFrom(metadata, file, message);
}
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message,
+ bool aligned) {
int32_t message_length = 0;
int64_t bytes_read = 0;
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
@@ -233,6 +235,16 @@ Status ReadMessage(io::InputStream* file,
std::unique_ptr<Message>* message) {
return Status::Invalid(ss.str());
}
+ // If requested, align the file before reading the message.
+ if (aligned) {
+ int64_t offset;
+ RETURN_NOT_OK(file->Tell(&offset));
+ int64_t aligned_offset = PaddedLength(offset);
+ int64_t num_extra_bytes = aligned_offset - offset;
+ std::shared_ptr<Buffer> dummy_buffer;
+ RETURN_NOT_OK(file->Read(num_extra_bytes, &dummy_buffer));
+ }
+
return Message::ReadFrom(metadata, file, message);
}
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 159b39a81..4e0089b7d 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -182,7 +182,8 @@ Status ReadMessage(const int64_t offset, const int32_t
metadata_length,
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
ARROW_EXPORT
-Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
+Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message,
+ bool aligned = false);
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index cc3b6e557..aefd491a3 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -684,8 +684,9 @@ Status RecordBatchFileReader::ReadRecordBatch(int i,
}
static Status ReadContiguousPayload(io::InputStream* file,
- std::unique_ptr<Message>* message) {
- RETURN_NOT_OK(ReadMessage(file, message));
+ std::unique_ptr<Message>* message,
+ bool aligned = false) {
+ RETURN_NOT_OK(ReadMessage(file, message, aligned));
if (*message == nullptr) {
return Status::Invalid("Unable to read metadata at offset");
}
@@ -715,7 +716,7 @@ Status ReadTensor(int64_t offset, io::RandomAccessFile*
file,
RETURN_NOT_OK(file->Seek(offset));
std::unique_ptr<Message> message;
- RETURN_NOT_OK(ReadContiguousPayload(file, &message));
+ RETURN_NOT_OK(ReadContiguousPayload(file, &message, true /* aligned */));
return ReadTensor(*message, out);
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 078efe1b0..86f2ed1d6 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -618,6 +618,9 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream*
dst, int32_t* metadat
if (tensor.is_contiguous()) {
RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length,
body_length));
+ // It's important to align the stream position again so that the tensor
data
+ // is aligned.
+ RETURN_NOT_OK(AlignStreamPosition(dst));
auto data = tensor.data();
if (data) {
*body_length = data->size();
@@ -630,6 +633,9 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream*
dst, int32_t* metadat
Tensor dummy(tensor.type(), tensor.data(), tensor.shape());
const auto& type = static_cast<const FixedWidthType&>(*tensor.type());
RETURN_NOT_OK(WriteTensorHeader(dummy, dst, metadata_length, body_length));
+ // It's important to align the stream position again so that the tensor
data
+ // is aligned.
+ RETURN_NOT_OK(AlignStreamPosition(dst));
const int elem_size = type.bit_width() / 8;
diff --git a/python/pyarrow/tests/test_serialization.py
b/python/pyarrow/tests/test_serialization.py
index 7ddf3958e..16b477bc9 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -691,3 +691,26 @@ def test_path_objects(tmpdir):
pa.serialize_to(obj, p)
res = pa.deserialize_from(p, None)
assert res == obj
+
+
+def test_tensor_alignment():
+ # Deserialized numpy arrays should be 64-byte aligned.
+ x = np.random.normal(size=(10, 20, 30))
+ y = pa.deserialize(pa.serialize(x).to_buffer())
+ assert y.ctypes.data % 64 == 0
+
+ xs = [np.random.normal(size=i) for i in range(100)]
+ ys = pa.deserialize(pa.serialize(xs).to_buffer())
+ for y in ys:
+ assert y.ctypes.data % 64 == 0
+
+ xs = [np.random.normal(size=i * (1,)) for i in range(20)]
+ ys = pa.deserialize(pa.serialize(xs).to_buffer())
+ for y in ys:
+ assert y.ctypes.data % 64 == 0
+
+ xs = [np.random.normal(size=i * (5,)) for i in range(1, 8)]
+ xs = [xs[i][(i + 1) * (slice(1, 3),)] for i in range(len(xs))]
+ ys = pa.deserialize(pa.serialize(xs).to_buffer())
+ for y in ys:
+ assert y.ctypes.data % 64 == 0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Serialized tensor data should be 64-byte aligned.
> -------------------------------------------------
>
> Key: ARROW-2308
> URL: https://issues.apache.org/jira/browse/ARROW-2308
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Python
> Reporter: Robert Nishihara
> Priority: Major
> Labels: pull-request-available
>
> See [https://github.com/ray-project/ray/issues/1658] for an example of this
> issue. Non-aligned data can trigger a copy when fed into TensorFlow and
> things like that.
> {code}
> import pyarrow as pa
> import numpy as np
> x = np.zeros(10)
> y = pa.deserialize(pa.serialize(x).to_buffer())
> x.ctypes.data % 64 # 0 (it starts out aligned)
> y.ctypes.data % 64 # 48 (it is no longer aligned)
> {code}
> It should be possible to fix this by calling something like
> {{RETURN_NOT_OK(AlignStreamPosition(dst));}} before writing the array data.
> Note that we already do this before writing the tensor header, but the tensor
> header is not necessarily a multiple of 64 bytes, so the subsequent data can
> be unaligned.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)