Ah, I see. I think both projects can proceed as well. At some point we will have to figure out how to merge them, but I think it's too early to see how exactly we will want to refactor things.
I looked over the code and I don't have any important comments for now. Looking forward to reviewing when it's ready. -David On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote: > > > On 12/29/21 11:03 PM, David Li wrote: > > Awesome, thanks for sharing this too! > > > > The refactoring you have with DataClientStream what I would like to do as > > well - I think much of the existing code can be adapted to be more > > transport-agnostic and then it will be easier to support new transports > > (whether data-only or for all methods). > > > > Where do you see the gaps between gRPC and this? I think what would happen > > is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3) client > > sees the unfamiliar prefix and creates a new client for the DoGet call (it > > would have to do this anyways if, for instance, the GetFlightInfo call > > returned the address of a different server). > > > > I mean implementation details. Some unit test runs longer than expected > (data plane timeouts reading from an ended stream). Looks grpc stream > finish message is not correctly intercepted and forwarded to data plane. > I don't think it's big problem, just need some time to debug. > > > I also wonder how this stacks up to UCX's shared memory backend (I did not > > test this though). > > > > I implemented a shared memory data plane only to verify and consolidate > the data plane design, as it's the easiest (and useful) driver. I also > plan to implement a socket based data plane, not useful in practice, > only to make sure the design works ok across network. Then we can add > more useful drivers like UCX or DPDK (the benefit of DPDK is it works on > commodity hardware, unlike UCX/RDMA which requires expensive equipment). > > > I would like to be able to support entire new transports for certain cases > > (namely browser support - though perhaps one of the gRPC proxies would > > suffice there), but even in that case, we could make it so that a new > > transport only needs to implement the data plane methods. Only having to > > support the data plane methods would save significant implementation effort > > for all non-browser cases so I think it's a worthwhile approach. > > > > Thanks for being interest in this approach. My current plan is to first > refactor shared memory data plane to verify it beats grpc in local rpc > by considerable margin, otherwise there must be big mistakes in my > design. After that I will fix unit test issues and deliver for community > review. > > Anyway, don't let me block your implementations. And if you think it's > useful, I can push current code for more detailed discussion. > > > -David > > > > On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote: > >> Thanks David to initiate UCX integration, great work! > >> I think 5Gbps network is too limited for performance evaluation. I will > >> try the patch on 100Gb RDMA network, hopefully we can see some > >> improvements. > >> I once benchmarked flight over 100Gb network [1], grpc based throughput is > >> 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I > >> also benchmarked raw RDMA performance (same batch sizes as flight), one > >> thread can achive 9GB/s with 12us latency. Of couse the comparison is not > >> fair. With David's patch, we can get a more realistic comparison. > >> > >> I'm implementing a data plane approach to hope we can adopt new data > >> acceleration methods easily. My approach is to replace only the FlighData > >> transmission of DoGet/Put/Exchange with data plane drivers, and grpc is > >> still used for all rpc calls. > >> Code is at my github repo [2]. Besides the framework, I just implemented a > >> shared memory data plane driver as PoC. Get/Put/Exchange unit tests > >> passed, TestCancel hangs, some unit tests run longer than expected, still > >> debugging. The shared memory data plane performance is pretty bad now, due > >> to repeated map/unmap for each read/write, pre-allocated pages should > >> improve much, still experimenting. > >> > >> Would like to hear community comments. > >> > >> My personal opinion is the data plane approach reuses grpc control plane, > >> may be easier to add new data acceleration methods, but it needs to fit > >> into grpc seamlessly (there're still gaps not resolved). A new tranport > >> requires much more initial effort, but may payoff later. And looks these > >> two approaches don't conflict with each other. > >> > >> [1] Test environment > >> nics: mellanox connectx5 > >> hosts: client (neoverse n1), server (xeon gold 5218) > >> os: ubuntu 20.04, linux kernel 5.4 > >> test case: 128k batch size, DoGet > >> > >> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane > >> > >> ________________________________ > >> From: David Li <lidav...@apache.org> > >> Sent: Wednesday, December 29, 2021 3:09 AM > >> To: dev@arrow.apache.org <dev@arrow.apache.org> > >> Subject: Re: Arrow in HPC > >> > >> I ended up drafting an implementation of Flight based on UCX, and doing > >> some > >> of the necessary refactoring to support additional backends in the future. > >> It can run the Flight benchmark, and performance is about comparable to > >> gRPC, as tested on AWS EC2. > >> > >> The implementation is based on the UCP streams API. It's extremely > >> bare-bones and is really only a proof of concept; a good amount of work is > >> needed to turn it into a usable implementation. I had hoped it would > >> perform > >> markedly better than gRPC, at least in this early test, but this seems not > >> to be the case. That said: I am likely not using UCX properly, UCX would > >> still open up support for additional hardware, and this work should allow > >> other backends to be implemented more easily. > >> > >> The branch can be viewed at > >> https://github.com/lidavidm/arrow/tree/flight-ucx > >> > >> I've attached the benchmark output at the end. > >> > >> There are still quite a few TODOs and things that need investigating: > >> > >> - Only DoGet and GetFlightInfo are implemented, and incompletely at that. > >> - Concurrent requests are not supported, or even making more than one > >> request on a connection, nor does the server support concurrent clients. > >> We also need to decide whether to even support concurrent requests, and > >> how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 > >> style > >> protocol, or even possibly implementing HTTP2). > >> - We need to make sure we properly handle errors, etc. everywhere. > >> - Are we using UCX in a performant and idiomatic manner? Will the > >> implementation work well on RDMA and other specialized hardware? > >> - Do we also need to support the UCX tag API? > >> - Can we refactor out interfaces that allow sharing more of the > >> client/server implementation between different backends? > >> - Are the abstractions sufficient to support other potential backends like > >> MPI, libfabrics, or WebSockets? > >> > >> If anyone has experience with UCX, I'd appreciate any feedback. Otherwise, > >> I'm hoping to plan out and try to tackle some of the TODOs above, and > >> figure > >> out how this effort can proceed. > >> > >> Antoine/Micah raised the possibility of extending gRPC instead. That would > >> be preferable, frankly, given otherwise we'd might have to re-implement a > >> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary > >> proposal stalled and was dropped without much discussion: > >> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY > >> > >> Benchmark results (also uploaded at > >> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952): > >> > >> Testing was done between two t3.xlarge instances in the same zone. > >> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s). > >> > >> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > >> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host > >> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 > >> -records_per_batch=4096 > >> Testing method: DoGet > >> [1640703417.639373] [ip-172-31-37-78:10110:0] ucp_worker.c:1627 UCX > >> INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > >> [1640703417.650068] [ip-172-31-37-78:10110:1] ucp_worker.c:1627 UCX > >> INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > >> Number of perf runs: 1 > >> Number of concurrent gets/puts: 1 > >> Batch size: 131072 > >> Batches read: 10000 > >> Bytes read: 1310720000 > >> Nanos: 2165862969 > >> Speed: 577.137 MB/s > >> Throughput: 4617.1 batches/s > >> Latency mean: 214 us > >> Latency quantile=0.5: 209 us > >> Latency quantile=0.95: 340 us > >> Latency quantile=0.99: 409 us > >> Latency max: 6350 us > >> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > >> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host > >> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 > >> -records_per_batch=65536 > >> Testing method: DoGet > >> [1640703439.428785] [ip-172-31-37-78:10116:0] ucp_worker.c:1627 UCX > >> INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > >> [1640703439.440359] [ip-172-31-37-78:10116:1] ucp_worker.c:1627 UCX > >> INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); > >> Number of perf runs: 1 > >> Number of concurrent gets/puts: 1 > >> Batch size: 2097152 > >> Batches read: 10000 > >> Bytes read: 20971520000 > >> Nanos: 34184175236 > >> Speed: 585.066 MB/s > >> Throughput: 292.533 batches/s > >> Latency mean: 3415 us > >> Latency quantile=0.5: 3408 us > >> Latency quantile=0.95: 3549 us > >> Latency quantile=0.99: 3800 us > >> Latency max: 20236 us > >> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > >> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host > >> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 > >> -records_per_batch=4096 > >> Testing method: DoGet > >> Using standalone TCP server > >> Server host: 172.31.34.4 > >> Server port: 31337 > >> Number of perf runs: 1 > >> Number of concurrent gets/puts: 1 > >> Batch size: 131072 > >> Batches read: 10000 > >> Bytes read: 1310720000 > >> Nanos: 2375289668 > >> Speed: 526.252 MB/s > >> Throughput: 4210.01 batches/s > >> Latency mean: 235 us > >> Latency quantile=0.5: 203 us > >> Latency quantile=0.95: 328 us > >> Latency quantile=0.99: 1377 us > >> Latency max: 17860 us > >> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info > >> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host > >> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 > >> -records_per_batch=65536 > >> Testing method: DoGet > >> Using standalone TCP server > >> Server host: 172.31.34.4 > >> Server port: 31337 > >> Number of perf runs: 1 > >> Number of concurrent gets/puts: 1 > >> Batch size: 2097152 > >> Batches read: 10000 > >> Bytes read: 20971520000 > >> Nanos: 34202704498 > >> Speed: 584.749 MB/s > >> Throughput: 292.375 batches/s > >> Latency mean: 3416 us > >> Latency quantile=0.5: 3406 us > >> Latency quantile=0.95: 3548 us > >> Latency quantile=0.99: 3764 us > >> Latency max: 17086 us > >> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 > >> -Z -l 1M > >> Connecting to host 172.31.34.4, port 1337 > >> [ 5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337 > >> [ ID] Interval Transfer Bitrate Retr Cwnd > >> [ 5] 0.00-1.00 sec 572 MBytes 4.79 Gbits/sec 36 2.35 MBytes > >> [ 5] 1.00-2.00 sec 582 MBytes 4.88 Gbits/sec 0 2.43 MBytes > >> [ 5] 2.00-3.00 sec 585 MBytes 4.91 Gbits/sec 0 2.43 MBytes > >> [ 5] 3.00-4.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes > >> [ 5] 4.00-5.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes > >> [ 5] 5.00-6.00 sec 586 MBytes 4.91 Gbits/sec 0 2.43 MBytes > >> [ 5] 6.00-7.00 sec 586 MBytes 4.92 Gbits/sec 0 2.43 MBytes > >> [ 5] 7.00-8.00 sec 580 MBytes 4.87 Gbits/sec 0 2.43 MBytes > >> [ 5] 8.00-9.00 sec 584 MBytes 4.89 Gbits/sec 0 2.43 MBytes > >> [ 5] 9.00-10.00 sec 577 MBytes 4.84 Gbits/sec 0 2.43 MBytes > >> - - - - - - - - - - - - - - - - - - - - - - - - - > >> [ ID] Interval Transfer Bitrate Retr > >> [ 5] 0.00-10.00 sec 5.69 GBytes 4.89 Gbits/sec 36 > >> sender > >> [ 5] 0.00-10.00 sec 5.69 GBytes 4.88 Gbits/sec > >> receiver > >> > >> iperf Done. > >> > >> Best, > >> David > >> > >> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote: > >>> "David Li" <lidav...@apache.org> writes: > >>> > >>>> Thanks for the clarification Yibo, looking forward to the results. Even > >>>> if it is a very hacky PoC it will be interesting to see how it affects > >>>> performance, though as Keith points out there are benefits in general to > >>>> UCX (or similar library), and we can work out the implementation plan > >>>> from there. > >>>> > >>>> To Benson's point - the work done to get UCX supported would pave the > >>>> way to supporting other backends as well. I'm personally not familiar > >>>> with UCX, MPI, etc. so is MPI here more about playing well with > >>>> established practices or does it also offer potential hardware > >>>> support/performance improvements like UCX would? > >>> > >>> There are two main implementations of MPI, MPICH and Open MPI, both of > >>> which are permissively licensed open source community projects. Both have > >>> direct support for UCX and unless your needs are very specific, the > >>> overhead of going through MPI is likely to be negligible. Both also have > >>> proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum > >>> MPI (Open MPI derivative), which may have optimizations for proprietary > >>> networks. Both MPICH and Open MPI can be built without UCX, and this is > >>> often easier (UCX 'master' is more volatile in my experience). > >>> > >>> The vast majority of distributed memory scientific applications use MPI > >>> or higher level libraries, rather than writing directly to UCX (which > >>> provides less coverage of HPC networks). I think MPI compatibility is > >>> important. > >>> > >>> From way up-thread (sorry): > >>> > >>>>>>>>>> Jed - how would you see MPI and Flight interacting? As another > >>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC > >>>>>>>>>> space. > >>> > >>> MPI has collective operations like MPI_Allreduce (perform a reduction and > >>> give every process the result; these run in log(P) or better time with > >>> small constants -- 15 microseconds is typical for a cheap reduction > >>> operation on a million processes). MPI supports user-defined operations > >>> for reductions and prefix-scan operations. If we defined MPI_Ops for > >>> Arrow types, we could compute summary statistics and other algorithmic > >>> building blocks fast at arbitrary scale. > >>> > >>> The collective execution model might not be everyone's bag, but MPI_Op > >>> can also be used in one-sided operations (MPI_Accumulate and > >>> MPI_Fetch_and_op) and dropping into collective mode has big advantages > >>> for certain algorithms in computational statistics/machine learning. > >>> > >> IMPORTANT NOTICE: The contents of this email and any attachments are > >> confidential and may also be privileged. If you are not the intended > >> recipient, please notify the sender immediately and do not disclose the > >> contents to any other person, use it for any purpose, or store or copy the > >> information in any medium. Thank you. > >> > > >