bkietz commented on code in PR #41180:
URL: https://github.com/apache/arrow/pull/41180#discussion_r1574899936


##########
docs/source/format/DissociatedIPC.rst:
##########
@@ -0,0 +1,376 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _dissociated-ipc:
+
+========================
+Dissociated IPC Protocol
+========================
+
+.. warning::
+
+    Experimental: The Dissociated IPC Protocol is experimental in its current
+    form. Based on feedback and usage the protocol definition may change until
+    it is fully standardized.
+
+Rationale
+=========
+
+The :ref:`Arrow IPC format <format-ipc>` describes a protocol for transferring
+Arrow data as a stream of record batches. This protocol expects a continuous
+stream of bytes divided into discrete messages (using a length prefix and
+continuation indicator). Each discrete message consists of two portions:
+
+* A `Flatbuffers`_ header message
+* A series of bytes consisting of the flattened and packed body buffers (some
+  message types, like Schema messages, do not have this section)
+  - This is referred to as the *message body* in the IPC format spec.
+
+For most cases, the existing IPC format as it currently exists is sufficiently 
efficient:
+
+* Receiving data in the IPC format allows zero-copy utilization of the body
+  buffer bytes, no deserialization is required to form Arrow Arrays
+* An IPC file format can be memory-mapped because it is location agnostic
+  and the bytes of the file are exactly what is expected in memory.
+
+However, there are use cases that aren't handled by this:
+
+* Constructing the IPC record batch message requires allocating a contiguous
+  chunk of bytes and copying all of the data buffers into it, packed together
+  back-to-back. It's exceedingly difficult to zero-copy **create** IPC 
messages.
+* If the Arrow data is located in a shared memory location, there is no 
standard
+  way to share the handle to the shared-memory across processes or transports 
that
+  allow for remote memory accessing, such as UCX.

Review Comment:
   ```suggestion
   * Even if Arrow data is located in a memory accessible across process 
boundaries
     or transports (such as UCX), there is no standard way to specify that 
shared
     location to consumers which could take advantage of it.
   ```



##########
docs/source/format/DissociatedIPC.rst:
##########
@@ -0,0 +1,376 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _dissociated-ipc:
+
+========================
+Dissociated IPC Protocol
+========================
+
+.. warning::
+
+    Experimental: The Dissociated IPC Protocol is experimental in its current
+    form. Based on feedback and usage the protocol definition may change until
+    it is fully standardized.
+
+Rationale
+=========
+
+The :ref:`Arrow IPC format <format-ipc>` describes a protocol for transferring
+Arrow data as a stream of record batches. This protocol expects a continuous
+stream of bytes divided into discrete messages (using a length prefix and
+continuation indicator). Each discrete message consists of two portions:
+
+* A `Flatbuffers`_ header message
+* A series of bytes consisting of the flattened and packed body buffers (some
+  message types, like Schema messages, do not have this section)
+  - This is referred to as the *message body* in the IPC format spec.
+
+For most cases, the existing IPC format as it currently exists is sufficiently 
efficient:
+
+* Receiving data in the IPC format allows zero-copy utilization of the body
+  buffer bytes, no deserialization is required to form Arrow Arrays
+* An IPC file format can be memory-mapped because it is location agnostic
+  and the bytes of the file are exactly what is expected in memory.
+
+However, there are use cases that aren't handled by this:
+
+* Constructing the IPC record batch message requires allocating a contiguous
+  chunk of bytes and copying all of the data buffers into it, packed together
+  back-to-back. It's exceedingly difficult to zero-copy **create** IPC 
messages.
+* If the Arrow data is located in a shared memory location, there is no 
standard
+  way to share the handle to the shared-memory across processes or transports 
that
+  allow for remote memory accessing, such as UCX.
+* Arrow data located on a non-CPU device (such as a GPU) cannot be sent using
+  Arrow IPC without having to copy the data back to the host device or copying
+  the Flatbuffers metadata bytes into device memory.
+  
+  * By the same token, receiving IPC messages into device memory would require
+    performing a copy of the Flatbuffers metadata back to the host CPU device. 
This
+    is due to the fact that the IPC stream interleaves data and metadata 
across a
+    single stream.
+
+This protocol attempts to solve these use cases in an efficient manner.
+
+Goals
+-----
+
+* Define a generic protocol for passing Arrow IPC data, not tied to any 
particular
+  transport, that also allows for utilizing non-CPU device memory, shared 
memory, and
+  newer "high performance" transports such as `UCX`_ or `libfabric`_.
+* Allow for using :ref:`Flight RPC <flight-rpc>` purely for control flow by 
separating
+  the stream of IPC metadata from IPC body bytes
+  
+  * This allows for the data in the body to be kept on non-CPU devices (like 
GPUs)

Review Comment:
   Nit: formatting makes this looks like a subpoint of Flight RPC, but I think 
it should be a subpoint of the other



##########
docs/source/format/DissociatedIPC/SequenceDiagramSeparate.mmd:
##########
@@ -0,0 +1,47 @@
+%% Licensed to the Apache Software Foundation (ASF) under one
+%% or more contributor license agreements.  See the NOTICE file
+%% distributed with this work for additional information
+%% regarding copyright ownership.  The ASF licenses this file
+%% to you under the Apache License, Version 2.0 (the
+%% "License"); you may not use this file except in compliance
+%% with the License.  You may obtain a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% To generate the diagram, use mermaid-cli
+%% Example: docker run --rm -v $(pwd)/DissociatedIPC:/data minlag/mermaid-cli 
-i /data/SequenceDiagramSeparate.mmd
+
+sequenceDiagram
+  participant D as Data Stream
+  participant C as Client
+  participant M as Metadata Stream
+
+  activate C
+  C-->>+M: TAG: <want_data> BODY: <opaque ID>
+  C-->>+D: TAG: <want_data> BODY: <opaque ID>
+  M-->>C: MSG: [1] + <sequence number> + <schema metadata>
+  loop each chunk
+    par
+      M-->>C: MSG: [1] + <sequence number> + <IPC metadata>
+    and
+      alt
+        D-->>C: TAG: <inline data> | <sequence number> BODY: <raw body bytes>

Review Comment:
   I'm not sure I understand the notation here. If I look at the description of 
tags at the same time I can read this as "the tag type is Tag::INLINE_DATA 
(which requires raw body bytes), and it includes a sequence number (which all 
tags include... except those sent by the consumer?)". I think this could be 
more understandable if it looked just a bit more formal:
   
   ```
     C-->>+M: TaggedMessage(server.want_data, bytes=ID_of_desired_data)
     C-->>+D: TaggedMessage(server.want_data, bytes=ID_of_desired_data)
     M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata)
     loop each batch
       par
         M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + 
batch_metadata)
       and
         alt
           D-->>C: TaggedMessage(sequence_number, bytes=batch_data)
         else
           D-->>C: TaggedMessage(sequence_number, uint64_pairs=offset_size_list)
   ```
   
   (also I think if it's correct to substitute batch for chunk that'd be more 
idiomatically Arrow)



##########
docs/source/format/DissociatedIPC.rst:
##########
@@ -0,0 +1,335 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _dissociated-ipc:
+
+========================
+Dissociated IPC Protocol
+========================
+
+.. warning::
+
+    Experimental: The Dissociated IPC Protocol is experimental in its current
+    form. Based on feedback and usage the protocol definition may change until
+    it is fully standardized.
+
+Rationale
+=========
+
+The :ref:`Arrow IPC format <format-ipc>` describes a protocol for transferring
+Arrow data as a stream of record batches. This protocol expects a continuous
+stream of bytes divided into discrete messages (using a length prefix and
+continuation indicator). Each discrete message consists of two portions:
+
+* A `Flatbuffers`_ header message
+* A series of bytes consisting of the flattened and packed body buffers (some
+  message types, like Schema messages, do not have this section)
+  - This is referred to as the *message body* in the IPC format spec.
+
+For most cases, the existing IPC format as it currently exists is extremely 
efficient:
+
+* Receiving data in the IPC format allows zero-copy utilization of the body
+  buffer bytes, no deserialization is required to form Arrow Arrays
+* An IPC (Feather) file can be memory-mapped because it is location agnostic
+  and the bytes of the file are exactly what is expected in memory.
+
+However, there are use cases that aren't handled by this:
+
+* Constructing the IPC record batch message requires allocating a contiguous
+  chunk of bytes and copying all of the data buffers into it, packed together
+  back-to-back. It's exceedingly difficult to zero-copy **create** IPC 
messages.
+* If the Arrow data is located in a shared-memory location, there is no 
standard
+  way to share the handle to the shared-memory across processes or transports 
that
+  allow for remote memory accessing.
+* Arrow data located on a non-CPU device (such as a GPU) cannot be sent using

Review Comment:
   IIUC sending in this context refers to sharing memory directly between GPUs. 
If so then this could emphasize that this is a special case of the previous 
point which applies even *within* a process, where colocation of header and 
data should be easiest to relax.



##########
docs/source/format/DissociatedIPC.rst:
##########
@@ -0,0 +1,376 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _dissociated-ipc:
+
+========================
+Dissociated IPC Protocol
+========================
+
+.. warning::
+
+    Experimental: The Dissociated IPC Protocol is experimental in its current
+    form. Based on feedback and usage the protocol definition may change until
+    it is fully standardized.
+
+Rationale
+=========
+
+The :ref:`Arrow IPC format <format-ipc>` describes a protocol for transferring
+Arrow data as a stream of record batches. This protocol expects a continuous
+stream of bytes divided into discrete messages (using a length prefix and
+continuation indicator). Each discrete message consists of two portions:
+
+* A `Flatbuffers`_ header message
+* A series of bytes consisting of the flattened and packed body buffers (some
+  message types, like Schema messages, do not have this section)
+  - This is referred to as the *message body* in the IPC format spec.
+
+For most cases, the existing IPC format as it currently exists is sufficiently 
efficient:
+
+* Receiving data in the IPC format allows zero-copy utilization of the body
+  buffer bytes, no deserialization is required to form Arrow Arrays
+* An IPC file format can be memory-mapped because it is location agnostic
+  and the bytes of the file are exactly what is expected in memory.
+
+However, there are use cases that aren't handled by this:
+
+* Constructing the IPC record batch message requires allocating a contiguous
+  chunk of bytes and copying all of the data buffers into it, packed together
+  back-to-back. It's exceedingly difficult to zero-copy **create** IPC 
messages.
+* If the Arrow data is located in a shared memory location, there is no 
standard
+  way to share the handle to the shared-memory across processes or transports 
that
+  allow for remote memory accessing, such as UCX.
+* Arrow data located on a non-CPU device (such as a GPU) cannot be sent using
+  Arrow IPC without having to copy the data back to the host device or copying
+  the Flatbuffers metadata bytes into device memory.
+  
+  * By the same token, receiving IPC messages into device memory would require
+    performing a copy of the Flatbuffers metadata back to the host CPU device. 
This
+    is due to the fact that the IPC stream interleaves data and metadata 
across a
+    single stream.
+
+This protocol attempts to solve these use cases in an efficient manner.
+
+Goals
+-----
+
+* Define a generic protocol for passing Arrow IPC data, not tied to any 
particular
+  transport, that also allows for utilizing non-CPU device memory, shared 
memory, and
+  newer "high performance" transports such as `UCX`_ or `libfabric`_.
+* Allow for using :ref:`Flight RPC <flight-rpc>` purely for control flow by 
separating
+  the stream of IPC metadata from IPC body bytes
+  
+  * This allows for the data in the body to be kept on non-CPU devices (like 
GPUs)
+    without expensive device-to-host copies.
+
+Definitions
+-----------
+
+   IPC Metadata
+       The Flatbuffers message bytes that encompass the header of an Arrow IPC 
message
+  
+   Tag
+       A little-endian ``uint64`` value used as an ID for a message. Specific 
bits can 
+       be masked to allow identifying messages by only a portion of the tag, 
leaving the 
+       rest of the bits to be used for control flow or other message metadata.
+
+   Sequence Number
+       A little-endian, 4-byte unsigned integer starting at 0 for a stream, 
indicating 
+       the sequence order of messages. It is also used to identify specific 
messages to 
+       tie the IPC metadata header to its corresponding body since the 
metadata and body
+       can be sent across separate pipes/streams/transports.
+
+       If a sequence number reaches ``UINT32_MAX``, it should be allowed to 
roll over as
+       it is unlikely there would be enough unprocessed messages waiting to be 
processed
+       that would cause an overlap of sequence numbers.
+
+       The sequence number serves two purposes: To identify corresponding 
metadata and 
+       tagged body data messages and to ensure we do not rely on messages 
having to arrive
+       in order. A client should use the sequence number to correctly order 
messages as
+       they arrive for processing.   
+
+The Protocol
+============
+
+A reference example implementation utilizing `libcudf`_ and `UCX`_ can be 
found at 
+https://github.com/zeroshade/cudf-flight-ucx.
+
+Requirements
+------------
+
+A transport implementing this protocol **MUST** provide two pieces of 
functionality:
+
+* Message sending
+  
+  * Delimited messages (like gRPC) as opposed to non-delimited streams (like 
plain TCP 
+    without further framing).
+  
+  * Alternatively, a framing mechanism like the :ref:`encapsulated message 
format <ipc-message-format>`
+    for the IPC protocol can be used while leaving out the body bytes.
+
+* Tagged message sending
+
+  * Sending a message that has an attached little-endian, unsigned 64-bit 
integral tag
+    for control flow. A tag like this allows control flow to operate on a 
message whose body
+    is on a non-CPU device without requiring the message itself to get copied 
off of the device.
+
+URI Specification
+-----------------
+
+When providing a URI to a consumer to contact for use with this protocol (such 
as via 
+the :ref:`Location URI for Flight <flight-location-uris>`), the URI should 
specify a scheme
+like *ucx:* or *fabric:*, that is easily identifiable. In addition, the URI 
should
+encode the following URI query parameters:
+
+.. note::
+    As this protocol matures, this document will get updated with commonly 
recognized
+    transport schemes that get used with it.
+
+* ``want_data`` - **REQUIRED** - uint64 integer value
+  
+  * This value should be used to tag an initial message to the server to 
initiate a
+    data transfer. The body of the initiating message should be an opaque 
binary identifier
+    of the data stream being requested (like the ``Ticket`` in the Flight RPC 
protocol)
+
+* ``free_data`` - **OPTIONAL** - uint64 integer value
+
+  * If the server might send messages using offsets / addresses for remote 
memory accessing
+    or shared memory locations, the URI should include this parameter. This 
value is used to
+    tag messages sent from the client to the data server, containing specific 
offsets / addresses
+    which were provided that are no longer required by the client (i.e. any 
operations that
+    directly reference those memory locations, such as copying the remote data 
into local memory,
+    have been completed).
+
+* ``remote_handle`` - **OPTIONAL** - base64-encoded string
+
+  * When working with shared memory or remote memory, this value indicates any 
required
+    handle or identifier that is necessary for accessing the memory.
+
+    * Using UCX, this would be an *rkey* value
+
+    * With CUDA IPC, this would be the value of the base GPU pointer or memory 
handle,
+      and subsequent addresses would be offsets from this base pointer.
+
+Handling of Backpressure
+------------------------
+
+*Currently* this proposal does not specify any way to manage the backpressure 
of
+messages to throttle for memory and bandwidth reasons. For now, this will be 
+**transport-defined** rather than lock into something sub-optimal. 
+
+As usage among different transports and libraries grows, common patterns will 
emerge
+that will allow for a generic, but efficient, way to handle backpressure across
+different use cases.
+
+.. note::
+  While the protocol itself is transport agnostic, the current usage and 
examples 
+  only have been tested using UCX and libfabric transports so far, but that's 
all.
+
+
+Protocol Description
+====================
+
+There are two possibilities that can occur:
+
+1. The streams of metadata and body data are sent across separate connections
+
+.. figure:: ./DissociatedIPC/SequenceDiagramSeparate.mmd.svg
+
+2. The streams of metadata and body data are sent simultaneously across the
+   same connection
+
+.. figure:: ./DissociatedIPC/SequenceDiagramSame.mmd.svg
+
+Server Sequence
+---------------
+
+There can be either a single server handling both the IPC Metadata stream and 
the
+Body data streams, or separate servers for handling the IPC Metadata and the 
body
+data. This allows for streaming of data across either a single transport pipe 
or
+two pipes if desired.
+
+Metadata Stream Sequence
+''''''''''''''''''''''''
+
+The standing state of the server is waiting for a **tagged** message with a 
specific
+``<want_data>`` tag value to initiate a transfer. This ``<want_data>`` value 
is defined
+by the server and propagated to any clients via the URI they are provided. 
This protocol
+does not prescribe any particular value so that it will not interfere with any 
other
+existing protocols that rely on tag values. The body of that message will 
contain an 
+opaque, binary identifier to indicate a particular dataset / data stream to 
send.
+
+.. note::
+
+  For instance, the **ticket** that was passed with a *FlightInfo* message 
would be
+  the body of this message. Because it is opaque, it can be anything the 
server wants
+  to use. The URI and identifier do not need to be given to the client via 
Flight RPC,
+  but could come across from any transport or protocol desired.
+
+Upon receiving a ``<want_data>`` request, the server *should* respond by 
sending a stream
+of messages consisting of the following:
+
+* A 5-byte prefix
+  
+  - The first byte of the message indicates the type of message, currently 
there are only
+    two allowed message types (more types may get added in the future):
+  
+    0) End of Stream
+    1) Flatbuffers IPC Metadata Message
+  
+  - the next 4-bytes are a little-endian, unsigned 32-bit integer indicating 
the sequence number of
+    the message. The first message in the stream (**MUST** always be a schema 
message) **MUST**
+    have a sequence number of ``0``. Each subsequent message **MUST** 
increment the number by 
+    ``1``.
+
+* The full Flatbuffers bytes of an Arrow IPC header
+
+As defined in the Arrow IPC format, each metadata message can represent a 
chunk of data or
+dictionaries for use by the stream of data. 
+
+After sending the last metadata message, the server **MUST** indicate the end 
of the stream
+by sending a message consisting of **exactly** 5 bytes:
+
+* The first byte is ``0``, indicating an **End of Stream** message
+* The last 4 bytes are the sequence number (4-byte, unsigned integer in 
little-endian byte order)
+
+Data Stream Sequence
+''''''''''''''''''''
+
+If a single server is handling both the data and metadata streams, then the 
data messages
+**should** begin being sent to the client in parallel with the metadata 
messages. Otherwise,
+as with the metadata sequence, the standing state of the server is to wait for 
a **tagged**
+message with the ``<want_data>`` tag value, whose body indicates the dataset / 
data stream
+to send to the client.
+
+For each IPC message in the stream of data, a **tagged** message **MUST** be 
sent on the data
+stream if that message has a body (i.e. a Record Batch or Dictionary message). 
The 
+:term:`tag <Tag>` for each message should be structured as follows:
+
+* The *least significant* 4-bytes (bits 0 - 31) of the tag should be the 
unsigned 32-bit, little-endian sequence 
+  number of the message.
+* The *most significant* byte (bits 56 - 63) of the tag indicates the message 
body **type** as an 8-bit
+  unsigned integer. Currently only two message types are specified, but more 
can be added as
+  needed to expand the protocol:
+  
+  0) The body contains the raw body buffer bytes as a packed buffer (i.e. the 
standard IPC
+     format body bytes)
+  1) The body contains a series of unsigned, little-endian 64-bit integer 
pairs to represent
+     either shared or remote memory, schematically structured as
+  
+     * The first two integers (e.g. the first 16 bytes) represent the *total* 
size (in bytes)
+       of all buffers and the number of buffers in this message (and thus the 
number of following
+       pairs of ``uint64``)
+  
+     * Each subsequent pair of ``uint64`` values are an address / offset 
followed the length of
+       that particular buffer.
+
+* All unspecified bits (bits 32 - 55) of the tag are *reserved* for future use 
by potential updates
+  to this protocol. For now they **MUST** be 0.
+

Review Comment:
   It might be worthwhile to add some graphical clarification of tag structure 
https://github.com/apache/arrow/blob/1982c2523f5791a5b742697185fd04ee580b464d/docs/source/format/Columnar.rst?plain=1#L268



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to