[ https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822616#comment-16822616 ]
binwei yang commented on SPARK-27396: ------------------------------------- This is the same proposal we are working on. I have a topic in Spark Summit, we may have a talk after session. h4. [Apache Arrow-Based Unified Data Sharing and Transferring Format Among CPU and Accelerators|https://databricks.com/sparkaisummit/sessions-single-2019/?id=42] > SPIP: Public APIs for extended Columnar Processing Support > ---------------------------------------------------------- > > Key: SPARK-27396 > URL: https://issues.apache.org/jira/browse/SPARK-27396 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Robert Joseph Evans > Priority: Major > > *Q1.* What are you trying to do? Articulate your objectives using absolutely > no jargon. > > The Dataset/DataFrame API in Spark currently only exposes to users one row at > a time when processing data. The goals of this are to > > # Expose to end users a new option of processing the data in a columnar > format, multiple rows at a time, with the data organized into contiguous > arrays in memory. > # Make any transitions between the columnar memory layout and a row based > layout transparent to the end user. > # Allow for simple data exchange with other systems, DL/ML libraries, > pandas, etc. by having clean APIs to transform the columnar data into an > Apache Arrow compatible layout. > # Provide a plugin mechanism for columnar processing support so an advanced > user could avoid data transition between columnar and row based processing > even through shuffles. This means we should at least support pluggable APIs > so an advanced end user can implement the columnar partitioning themselves, > and provide the glue necessary to shuffle the data still in a columnar format. > # Expose new APIs that allow advanced users or frameworks to implement > columnar processing either as UDFs, or by adjusting the physical plan to do > columnar processing. If the latter is too controversial we can move it to > another SPIP, but we plan to implement some accelerated computing in parallel > with this feature to be sure the APIs work, and without this feature it makes > that impossible. > > Not Requirements, but things that would be nice to have. > # Provide default implementations for partitioning columnar data, so users > don’t have to. > # Transition the existing in memory columnar layouts to be compatible with > Apache Arrow. This would make the transformations to Apache Arrow format a > no-op. The existing formats are already very close to those layouts in many > cases. This would not be using the Apache Arrow java library, but instead > being compatible with the memory > [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a > subset of that layout. > # Provide a clean transition from the existing code to the new one. The > existing APIs which are public but evolving are not that far off from what is > being proposed. We should be able to create a new parallel API that can wrap > the existing one. This means any file format that is trying to support > columnar can still do so until we make a conscious decision to deprecate and > then turn off the old APIs. > > *Q2.* What problem is this proposal NOT designed to solve? > This is not trying to implement any of the processing itself in a columnar > way, with the exception of examples for documentation, and possibly default > implementations for partitioning of columnar shuffle. > > *Q3.* How is it done today, and what are the limits of current practice? > The current columnar support is limited to 3 areas. > # Input formats, optionally can return a ColumnarBatch instead of rows. The > code generation phase knows how to take that columnar data and iterate > through it as rows for stages that wants rows, which currently is almost > everything. The limitations here are mostly implementation specific. The > current standard is to abuse Scala’s type erasure to return ColumnarBatches > as the elements of an RDD[InternalRow]. The code generation can handle this > because it is generating java code, so it bypasses scala’s type checking and > just casts the InternalRow to the desired ColumnarBatch. This makes it > difficult for others to implement the same functionality for different > processing because they can only do it through code generation. There really > is no clean separate path in the code generation for columnar vs row based. > Additionally because it is only supported through code generation if for any > reason code generation would fail there is no backup. This is typically fine > for input formats but can be problematic when we get into more extensive > processing. > # When caching data it can optionally be cached in a columnar format if the > input is also columnar. This is similar to the first area and has the same > limitations because the cache acts as an input, but it is the only piece of > code that also consumes columnar data as an input. > # Pandas vectorized processing. To be able to support Pandas UDFs Spark > will build up a batch of data and send it python for processing, and then get > a batch of data back as a result. The format of the data being sent to > python can either be pickle, which is the default, or optionally Arrow. The > result returned is the same format. The limitations here really are around > performance. Transforming the data back and forth can be very expensive. > > *Q4.* What is new in your approach and why do you think it will be successful? > Conceptually what we are primarily doing is cleaning up a lot of existing > functionality and getting it ready to be exposed as public facing APIs. We > think we can be successful because we have already completed a proof of > concept that shows columnar processing can be efficiently done in Spark. > > *Q5.* Who cares? If you are successful, what difference will it make? > Anyone who wants to integrate Spark with other data processing systems will > care, as it provides a cleaner, more standards-based way of transferring the > data. Anyone who wants to experiment with accelerated computing, either on a > CPU or GPU, will benefit as it provides a supported way to make this work > instead of trying to hack something in that was not available before. > > *Q6.* What are the risks? > > Technologically I don’t see many risks. We have done a proof of concept > implementation that shows it can be done. This biggest risks are if Apache > Arrow loses favor as an interchange format between different systems. In the > worst cast that means we may have to support transforming the data into other > types, but because Arrow as the internal format is not a requirement and > there really are not that many ways to lay out columnar data, it should > require minor changes if anything. > > The second risk is if we didn’t setup the API in a powerful/flexible enough > way for users to really take full advantage of it. This is really going to > come with time, and if we start off fairly conservatively in our API we can > hopefully expand it in the future. > > The final risk is around dictionaries. The current columnar layout exposes > the dictionary as just a java class. When we go to an Arrow compatible > layout the dictionary has a specific format. The Parquet APIs don’t give raw > access to the layout of the dictionary. This is not something that we cannot > overcome, it just means we may have less than optimal translations, in terms > of performance, when going from Parquet formatted data to Arrow until we can > get some changes into the Parquet API. This may be true of Orc as well. It > is no worse off than it is today already. > > *Q7.* How long will it take? > I suspect that we can put together a patch with tests in a month. Adding > documentation and iterating on the APIs I would suspect would put it at a > month and a half to two months. So one quarter would give us enough time to > get through everything, including reviews if things go well. > > *Q8.* What are the mid-term and final “exams” to check for success? > The first check for success would be to successfully transition the existing > columnar code over to using this. That would be transforming and sending the > data to python for processing by Pandas UDFs, and the file formats and > caching code to use a cleaner more explicit columnar API. > > The final success is really about adoption and seeing if the follow on work > that we, and hopefully others, have shows that it cleanly enables something > that was not possible before. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org