This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 39243ff ARROW-1409: [Format] Remove page id from Buffer metadata, increment metadata version number 39243ff is described below commit 39243ffaf5eb1d1f2a748ea1ec2b36658ba7f3d7 Author: Wes McKinney <wes.mckin...@twosigma.com> AuthorDate: Mon Oct 30 14:59:47 2017 -0400 ARROW-1409: [Format] Remove page id from Buffer metadata, increment metadata version number This is a breaking metadata change per discussion on the mailing list. I expect this kind of truly breaking changes to be exceedingly rare going forward, and when we make a 1.0.0 release we should document expectations around metadata / memory format stability. This could be made backwards compatible with some effort (we would have to add `RecordBatchV3` and `BufferV3` types). Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #1225 from wesm/ARROW-1409 and squashes the following commits: 582fad90 [Wes McKinney] Disable JS in Travis CI for now 845f290f [Wes McKinney] Bump metadata version in Java, add check for V4 e2150c19 [Wes McKinney] Remove page id from Buffer metadata, increment metadata version number --- .travis.yml | 15 +++++++------ cpp/src/arrow/ipc/ipc-read-write-test.cc | 2 +- cpp/src/arrow/ipc/message.cc | 15 +------------ cpp/src/arrow/ipc/message.h | 14 +++++++++++- cpp/src/arrow/ipc/metadata-internal.cc | 25 ++++++++++++++++++++-- cpp/src/arrow/ipc/metadata-internal.h | 10 ++++----- cpp/src/arrow/ipc/reader.cc | 15 +------------ cpp/src/arrow/ipc/writer.cc | 12 +---------- format/Schema.fbs | 14 +++++++----- .../apache/arrow/vector/schema/ArrowBuffer.java | 16 +++----------- .../arrow/vector/schema/ArrowRecordBatch.java | 2 +- .../arrow/vector/stream/MessageSerializer.java | 6 +++++- js/src/format/Schema_generated.ts | 20 ++++------------- 13 files changed, 75 insertions(+), 91 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6419548..52d7a5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -112,13 +112,14 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh - - language: node_js - os: linux - node_js: node - before_script: - - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh - script: - - $TRAVIS_BUILD_DIR/ci/travis_script_js.sh + # TODO(wesm): Re-enable after issues in ARROW-1409 resolved + # - language: node_js + # os: linux + # node_js: node + # before_script: + # - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh + # script: + # - $TRAVIS_BUILD_DIR/ci/travis_script_js.sh - compiler: gcc language: cpp os: linux diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index adf34a9..6f2f5cf 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -243,7 +243,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) { std::unique_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - ASSERT_EQ(MetadataVersion::V3, message->metadata_version()); + ASSERT_EQ(MetadataVersion::V4, message->metadata_version()); } TEST_P(TestIpcRoundTrip, SliceRoundTrip) { diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 0dd5c72..21d6a69 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -67,20 +67,7 @@ class Message::MessageImpl { } MetadataVersion version() const { - switch (message_->version()) { - case flatbuf::MetadataVersion_V1: - // Arrow 0.1 - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - // Arrow 0.2 - return MetadataVersion::V2; - case flatbuf::MetadataVersion_V3: - // Arrow >= 0.3 - return MetadataVersion::V3; - // Add cases as other versions become available - default: - return MetadataVersion::V3; - } + return internal::GetMetadataVersion(message_->version()); } const void* header() const { return message_->header(); } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index a1b6c07..495474e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -42,7 +42,19 @@ class RandomAccessFile; namespace ipc { -enum class MetadataVersion : char { V1, V2, V3 }; +enum class MetadataVersion : char { + /// 0.1.0 + V1, + + /// 0.2.0 + V2, + + /// 0.3.0 to 0.7.1 + V3, + + /// >= 0.8.0 + V4 +}; // ARROW-109: We set this number arbitrarily to help catch user mistakes. For // deeply nested schemas, it is expected the user will indicate explicitly the diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index ad00cfb..f04e9b0 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -33,6 +33,7 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/Tensor_generated.h" #include "arrow/ipc/dictionary.h" +#include "arrow/ipc/message.h" #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/tensor.h" @@ -57,6 +58,26 @@ using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>; using Offset = flatbuffers::Offset<void>; using FBString = flatbuffers::Offset<flatbuffers::String>; +MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version) { + switch (version) { + case flatbuf::MetadataVersion_V1: + // Arrow 0.1 + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + // Arrow 0.2 + return MetadataVersion::V2; + case flatbuf::MetadataVersion_V3: + // Arrow 0.3 to 0.7.1 + return MetadataVersion::V4; + case flatbuf::MetadataVersion_V4: + // Arrow >= 0.8 + return MetadataVersion::V4; + // Add cases as other versions become available + default: + return MetadataVersion::V4; + } +} + static Status IntFromFlatbuffer(const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) { if (int_data->bitWidth() > 64) { @@ -700,7 +721,7 @@ static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers, for (size_t i = 0; i < buffers.size(); ++i) { const BufferMetadata& buffer = buffers[i]; - fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length); + fb_buffers.emplace_back(buffer.offset, buffer.length); } *out = fbb.CreateVectorOfStructs(fb_buffers); return Status::OK(); @@ -751,7 +772,7 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, auto fb_shape = fbb.CreateVector(dims); auto fb_strides = fbb.CreateVector(tensor.strides()); int64_t body_length = tensor.data()->size(); - flatbuf::Buffer buffer(-1, buffer_start_offset, body_length); + flatbuf::Buffer buffer(buffer_start_offset, body_length); TensorOffset fb_tensor = flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 309e758..380f3c9 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -27,6 +27,7 @@ #include "arrow/ipc/Schema_generated.h" #include "arrow/ipc/dictionary.h" +#include "arrow/ipc/message.h" namespace arrow { @@ -48,10 +49,12 @@ namespace ipc { namespace internal { static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = - flatbuf::MetadataVersion_V3; + flatbuf::MetadataVersion_V4; static constexpr flatbuf::MetadataVersion kMinMetadataVersion = - flatbuf::MetadataVersion_V3; + flatbuf::MetadataVersion_V4; + +MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version); static constexpr const char* kArrowMagicBytes = "ARROW1"; @@ -62,9 +65,6 @@ struct FieldMetadata { }; struct BufferMetadata { - /// The shared memory page id where to find this. Set to -1 if unused - int32_t page; - /// The relative offset into the memory page to the starting byte of the buffer int64_t offset; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 50eb903..8e10d7d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -550,20 +550,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { int num_record_batches() const { return footer_->recordBatches()->size(); } MetadataVersion version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - // Arrow 0.1 - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - // Arrow 0.2 - return MetadataVersion::V2; - case flatbuf::MetadataVersion_V3: - // Arrow 0.3 - return MetadataVersion::V3; - // Add cases as other versions become available - default: - return MetadataVersion::V3; - } + return internal::GetMetadataVersion(footer_->version()); } FileBlock record_batch(int i) const { diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 279a695..5598cc6 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -149,8 +149,6 @@ class RecordBatchSerializer : public ArrayVisitor { buffer_meta_.reserve(buffers_.size()); - const int32_t kNoPageId = -1; - // Construct the buffer metadata for the record batch header for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); @@ -163,15 +161,7 @@ class RecordBatchSerializer : public ArrayVisitor { padding = BitUtil::RoundUpToMultipleOf8(size) - size; } - // TODO(wesm): We currently have no notion of shared memory page id's, - // but we've included it in the metadata IDL for when we have it in the - // future. Use page = -1 for now - // - // Note that page ids are a bespoke notion for Arrow and not a feature we - // are using from any OS-level shared memory. The thought is that systems - // may (in the future) associate integer page id's with physical memory - // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back({kNoPageId, offset, size + padding}); + buffer_meta_.push_back({offset, size + padding}); offset += size + padding; } diff --git a/format/Schema.fbs b/format/Schema.fbs index 186f8e3..6021e92 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -20,9 +20,17 @@ namespace org.apache.arrow.flatbuf; enum MetadataVersion:short { + /// 0.1.0 V1, + + /// 0.2.0 V2, - V3 + + /// 0.3.0 -> 0.7.1 + V3, + + /// >= 0.8.0 + V4 } /// These are stored in the flatbuffer in the Type union below @@ -293,10 +301,6 @@ enum Endianness:short { Little, Big } /// ---------------------------------------------------------------------- /// A Buffer represents a single contiguous memory segment struct Buffer { - /// The shared memory page id where this buffer is located. Currently this is - /// not used - page: int; - /// The relative offset into the shared memory page where the bytes for this /// buffer starts offset: long; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java index d8c9e30..4e0187e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java @@ -24,21 +24,15 @@ import com.google.flatbuffers.FlatBufferBuilder; public class ArrowBuffer implements FBSerializable { - private int page; private long offset; private long size; - public ArrowBuffer(int page, long offset, long size) { + public ArrowBuffer(long offset, long size) { super(); - this.page = page; this.offset = offset; this.size = size; } - public int getPage() { - return page; - } - public long getOffset() { return offset; } @@ -52,7 +46,6 @@ public class ArrowBuffer implements FBSerializable { final int prime = 31; int result = 1; result = prime * result + (int) (offset ^ (offset >>> 32)); - result = prime * result + page; result = prime * result + (int) (size ^ (size >>> 32)); return result; } @@ -72,9 +65,6 @@ public class ArrowBuffer implements FBSerializable { if (offset != other.offset) { return false; } - if (page != other.page) { - return false; - } if (size != other.size) { return false; } @@ -83,12 +73,12 @@ public class ArrowBuffer implements FBSerializable { @Override public int writeTo(FlatBufferBuilder builder) { - return Buffer.createBuffer(builder, page, offset, size); + return Buffer.createBuffer(builder, offset, size); } @Override public String toString() { - return "ArrowBuffer [page=" + page + ", offset=" + offset + ", size=" + size + "]"; + return "ArrowBuffer [offset=" + offset + ", size=" + size + "]"; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java index c842d4c..bf0967a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java @@ -72,7 +72,7 @@ public class ArrowRecordBatch implements ArrowMessage { for (ArrowBuf arrowBuf : buffers) { arrowBuf.retain(); long size = arrowBuf.readableBytes(); - arrowBuffers.add(new ArrowBuffer(0, offset, size)); + arrowBuffers.add(new ArrowBuffer(offset, size)); LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", offset, size)); offset += size; if (alignBuffers && offset % 8 != 0) { // align on 8 byte boundaries diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index f69aa41..c397cec 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -385,6 +385,10 @@ public class MessageSerializer { throw new IOException("Cannot currently deserialize record batches over 2GB"); } + if (message.version() != MetadataVersion.V4) { + throw new IOException("Received metadata with an incompatible version number"); + } + switch (message.headerType()) { case MessageHeader.RecordBatch: return deserializeRecordBatch(in, message, alloc); @@ -409,7 +413,7 @@ public class MessageSerializer { Message.startMessage(builder); Message.addHeaderType(builder, headerType); Message.addHeader(builder, headerOffset); - Message.addVersion(builder, MetadataVersion.V3); + Message.addVersion(builder, MetadataVersion.V4); Message.addBodyLength(builder, bodyLength); builder.finish(Message.endMessage(builder)); return builder.dataBuffer(); diff --git a/js/src/format/Schema_generated.ts b/js/src/format/Schema_generated.ts index 65493b7..c5b3e50 100644 --- a/js/src/format/Schema_generated.ts +++ b/js/src/format/Schema_generated.ts @@ -2028,23 +2028,13 @@ export namespace org.apache.arrow.flatbuf { } /** - * The shared memory page id where this buffer is located. Currently this is - * not used - * - * @returns {number} - */ - page(): number { - return this.bb.readInt32(this.bb_pos); - } - - /** * The relative offset into the shared memory page where the bytes for this * buffer starts * * @returns {flatbuffers.Long} */ offset(): flatbuffers.Long { - return this.bb.readInt64(this.bb_pos + 8); + return this.bb.readInt64(this.bb_pos); } /** @@ -2054,7 +2044,7 @@ export namespace org.apache.arrow.flatbuf { * @returns {flatbuffers.Long} */ length(): flatbuffers.Long { - return this.bb.readInt64(this.bb_pos + 16); + return this.bb.readInt64(this.bb_pos + 8); } /** @@ -2064,12 +2054,10 @@ export namespace org.apache.arrow.flatbuf { * @param {flatbuffers.Long} length * @returns {flatbuffers.Offset} */ - static createBuffer(builder: flatbuffers.Builder, page: number, offset: flatbuffers.Long, length: flatbuffers.Long): flatbuffers.Offset { - builder.prep(8, 24); + static createBuffer(builder: flatbuffers.Builder, offset: flatbuffers.Long, length: flatbuffers.Long): flatbuffers.Offset { + builder.prep(8, 16); builder.writeInt64(length); builder.writeInt64(offset); - builder.pad(4); - builder.writeInt32(page); return builder.offset(); } -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" <commits@arrow.apache.org>'].