sqlContext.read.parquet <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258> takes lists of files.
val fileList = sc.textFile("file_list.txt").collect() // this works but using spark is possibly overkill val dataFrame = sqlContext.read.parquet(fileList: _*) On Mon, Sep 28, 2015 at 1:35 PM, jwthomas <jordan.tho...@accenture.com> wrote: > We are working with use cases where we need to do batch processing on a > large > number (hundreds of thousands) of Parquet files. The processing is quite > similar per file. There are a many aggregates that are very SQL-friendly > (computing averages, maxima, minima, aggregations on single columns with > some selection criteria). There are also some processing that is more > advanced time-series processing (continuous wavelet transforms and the > like). This all seems like a good use case for Spark. > > But I'm having performance problems. Let's take a look at something very > simple, which simply checks whether the parquet files are readable. > > Code that seems natural but doesn't work: > > import scala.util.{Try, Success, Failure} val parquetFiles = > sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x, > Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1) > > My understanding is that this doesn't work because sqlContext can't be used > inside of a transformation like "map" (or inside an action). That it only > makes sense in the driver. Thus, it becomes a null reference in the above > code, so all reads fail. > > Code that works: > > import scala.util.{Try, Success, Failure} val parquetFiles = > sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x > => > (x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1) > > > This works because the collect() means that everything happens back on the > driver. So the sqlContext object makes sense and everything works fine. > > But it is slow. I'm using yarn-client mode on a 6-node cluster with 17 > executors, 40 GB ram on driver, 19GB on executors. And it takes about 1 > minute to execute for 100 parquet files. Which is too long. Recall we > need > to do this across hundreds of thousands of files. > > I realize it is possible to parallelize after the read: > > import scala.util.{Try, Success, Failure} val parquetFiles = > sc.textFile("file_list.txt") val intermediate_successes = > parquetFiles.collect().map(x => (x, > Try(sqlContext.read.parquet(x)))) > val dist_successes = sc.parallelize(successes) val successes = > dist_successes.filter(_._2.isSuccess).map(x => x._1) > > > But this does not improve performance substantially. It seems the > bottleneck is that the reads are happening sequentially. > > Is there a better way to do this? > > Thanks, > Jordan > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >