sqlContext.read.parquet
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but
using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas <jordan.tho...@accenture.com>
wrote:

> We are working with use cases where we need to do batch processing on a
> large
> number (hundreds of thousands) of Parquet files.  The processing is quite
> similar per file.  There are a many aggregates that are very SQL-friendly
> (computing averages, maxima, minima, aggregations on single columns with
> some selection criteria).  There are also some processing that is more
> advanced time-series processing (continuous wavelet transforms and the
> like).  This all seems like a good use case for Spark.
>
> But I'm having performance problems.  Let's take a look at something very
> simple, which simply checks whether the parquet files are readable.
>
> Code that seems natural but doesn't work:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
> Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)
>
> My understanding is that this doesn't work because sqlContext can't be used
> inside of a transformation like "map" (or inside an action).  That it only
> makes sense in the driver.  Thus, it becomes a null reference in the above
> code, so all reads fail.
>
> Code that works:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
> =>
> (x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)
>
>
> This works because the collect() means that everything happens back on the
> driver.  So the sqlContext object makes sense and everything works fine.
>
> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
> minute to execute for 100 parquet files.  Which is too long.  Recall we
> need
> to do this across hundreds of thousands of files.
>
> I realize it is possible to parallelize after the read:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val intermediate_successes =
> parquetFiles.collect().map(x => (x,
> Try(sqlContext.read.parquet(x))))
> val dist_successes = sc.parallelize(successes) val successes =
> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>
>
> But this does not improve performance substantially.  It seems the
> bottleneck is that the reads are happening sequentially.
>
> Is there a better way to do this?
>
> Thanks,
> Jordan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to