[ https://issues.apache.org/jira/browse/SPARK-24579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17361175#comment-17361175 ]
binwei yang commented on SPARK-24579: ------------------------------------- We implemented the native Apache Arrow data source and Arrow to DMatrix conversion natively. The solution can decrease the data preparing time to a negligible level. This is actually more important to inference than training. The link has the details. [https://medium.com/intel-analytics-software/optimizing-the-end-to-end-training-pipeline-on-apache-spark-clusters-80261d6a7b8c] During the implementation, there are 3 points we need to improve from Spark side: * There are different columnarBatch define in Spark. Now Arrow is mature and data format is stable. Why don't we use Arrow's ColumnarBatch directly? Like RDD cache. * We still don't have an easy way to get RDD[ColumnarBatch]. Here is what we use: {code:java} val qe = new QueryExecution(df.sparkSession, df.queryExecution.logical) { override protected def preparations: Seq[Rule[SparkPlan]] = { Seq( PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ) } } val plan = qe.executedPlan val rdd: RDD[ColumnarBatch] = plan.executeColumnar() (rdd.map {...}) {code} Some way like this can be much easier to use {code:java} RDD[ColumnarBatch] columnardf = df.getColumnarDF() //or df.mapColumnarBatch() {code} * (Not related to this JIRA). XGBoost uses TBB mode, so we need to change task.cpus by stage level resource management APIs, which is good. But we still need to easily collect data from each task threads in the same executor. Ideally we should avoid the memory copy during the process. What we did now is like below. We created a customized CoalescePartitioner which combines the dmatrix pointers in the same executor, then concat them. I'm think some more general way to do this. {code:java} dmatrixpointerrdd.cache() dmatrixpointerrdd.foreachPartition(() =>) val coalescedrdd = dmatrixpointerrdd.coalesce(1, partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner())) coalescedrdd.mapPartitions (...){code} > SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks > ---------------------------------------------------------------------------- > > Key: SPARK-24579 > URL: https://issues.apache.org/jira/browse/SPARK-24579 > Project: Spark > Issue Type: Epic > Components: ML, PySpark, SQL > Affects Versions: 3.0.0 > Reporter: Xiangrui Meng > Assignee: Xiangrui Meng > Priority: Major > Labels: Hydrogen > Attachments: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange > between Apache Spark and DL%2FAI Frameworks .pdf > > > (see attached SPIP pdf for more details) > At the crossroads of big data and AI, we see both the success of Apache Spark > as a unified > analytics engine and the rise of AI frameworks like TensorFlow and Apache > MXNet (incubating). > Both big data and AI are indispensable components to drive business > innovation and there have > been multiple attempts from both communities to bring them together. > We saw efforts from AI community to implement data solutions for AI > frameworks like tf.data and tf.Transform. However, with 50+ data sources and > built-in SQL, DataFrames, and Streaming features, Spark remains the community > choice for big data. This is why we saw many efforts to integrate DL/AI > frameworks with Spark to leverage its power, for example, TFRecords data > source for Spark, TensorFlowOnSpark, TensorFrames, etc. As part of Project > Hydrogen, this SPIP takes a different angle at Spark + AI unification. > None of the integrations are possible without exchanging data between Spark > and external DL/AI frameworks. And the performance matters. However, there > doesn’t exist a standard way to exchange data and hence implementation and > performance optimization fall into pieces. For example, TensorFlowOnSpark > uses Hadoop InputFormat/OutputFormat for TensorFlow’s TFRecords to load and > save data and pass the RDD records to TensorFlow in Python. And TensorFrames > converts Spark DataFrames Rows to/from TensorFlow Tensors using TensorFlow’s > Java API. How can we reduce the complexity? > The proposal here is to standardize the data exchange interface (or format) > between Spark and DL/AI frameworks and optimize data conversion from/to this > interface. So DL/AI frameworks can leverage Spark to load data virtually > from anywhere without spending extra effort building complex data solutions, > like reading features from a production data warehouse or streaming model > inference. Spark users can use DL/AI frameworks without learning specific > data APIs implemented there. And developers from both sides can work on > performance optimizations independently given the interface itself doesn’t > introduce big overhead. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org