I guess you're probably using Spark 1.5? Spark SQL does support schema merging, but we disabled it by default since 1.5 because it introduces extra performance costs (it's turned on by default in 1.4 and 1.3).

You may enable schema merging via either the Parquet data source specific option "mergeSchema":

  sqlContext.read.option("mergeSchema", "true").parquet(path)

or the global SQL option "spark.sql.parquet.mergeSchema":

  sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
  sqlContext.read.parquet(path)

Cheng

On 9/28/15 3:45 PM, jordan.tho...@accenture.com wrote:

Dear Michael,

Thank you very much for your help.

I should have mentioned in my original email, I did try the sequence notation. It doesn’t seem to have the desired effect. Maybe I should say that each one of these files has a different schema. When I use that call, I’m not ending up with a data frame with columns from all of the files taken together, but just one of them. I’m tracing through the code trying to understand exactly what is happening with the Seq[String] call. Maybe you know? Is it trying to do some kind of schema merging?

Also, it seems that even if I could get it to work, it would require some parsing of the resulting schemas to find the invalid files. I would like to capture these errors on read.

The parquet files currently average about 60 MB in size, with min about 40 MB and max about 500 or so. I could coalesce, but they do correspond to logical entities and there are a number of use-case specific reasons to keep them separate.

Thanks,

Jordan

*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
*Cc:* user <user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files

Another note: for best performance you are going to want your parquet files to be pretty big (100s of mb). You could coalesce them and write them out for more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust <mich...@databricks.com <mailto:mich...@databricks.com>> wrote:

    sqlContext.read.parquet
    
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
    takes lists of files.

    val fileList = sc.textFile("file_list.txt").collect() // this
    works but using spark is possibly overkill
    val dataFrame = sqlContext.read.parquet(fileList: _*)

    On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
    <jordan.tho...@accenture.com <mailto:jordan.tho...@accenture.com>>
    wrote:

        We are working with use cases where we need to do batch
        processing on a large
        number (hundreds of thousands) of Parquet files.  The
        processing is quite
        similar per file.  There are a many aggregates that are very
        SQL-friendly
        (computing averages, maxima, minima, aggregations on single
        columns with
        some selection criteria).  There are also some processing that
        is more
        advanced time-series processing (continuous wavelet transforms
        and the
        like).  This all seems like a good use case for Spark.

        But I'm having performance problems.  Let's take a look at
        something very
        simple, which simply checks whether the parquet files are
        readable.

        Code that seems natural but doesn't work:

        import scala.util.{Try, Success, Failure} val parquetFiles =
        sc.textFile("file_list.txt") val successes =
        parquetFiles.map(x => (x,
        Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
        => x._1)

        My understanding is that this doesn't work because sqlContext
        can't be used
inside of a transformation like "map" (or inside an action). That it only
        makes sense in the driver.  Thus, it becomes a null reference
        in the above
        code, so all reads fail.

        Code that works:

        import scala.util.{Try, Success, Failure} val parquetFiles =
        sc.textFile("file_list.txt") val successes =
        parquetFiles.collect().map(x =>
        (x,
        Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
        => x._1)


        This works because the collect() means that everything happens
        back on the
        driver.  So the sqlContext object makes sense and everything
        works fine.

        But it is slow.  I'm using yarn-client mode on a 6-node
        cluster with 17
        executors, 40 GB ram on driver, 19GB on executors.  And it
        takes about 1
minute to execute for 100 parquet files. Which is too long. Recall we need
        to do this across hundreds of thousands of files.

        I realize it is possible to parallelize after the read:

        import scala.util.{Try, Success, Failure} val parquetFiles =
        sc.textFile("file_list.txt") val intermediate_successes =
        parquetFiles.collect().map(x => (x,
        Try(sqlContext.read.parquet(x))))
        val dist_successes = sc.parallelize(successes) val successes =
        dist_successes.filter(_._2.isSuccess).map(x => x._1)


        But this does not improve performance substantially.  It seems the
        bottleneck is that the reads are happening sequentially.

        Is there a better way to do this?

        Thanks,
        Jordan




        --
        View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
        Sent from the Apache Spark User List mailing list archive at
        Nabble.com.

        ---------------------------------------------------------------------
        To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
        For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>


------------------------------------------------------------------------

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com

Reply via email to