Steve is right that the S3 committer isn't a ParquetOutputCommitter. I
think that the reason that check exists is to make sure Parquet writes
_metadata summary files to an output directory. But, I think the **summary
files are a bad idea**, so we bypass that logic and use the committer
directly if the output path is in S3.

Why are summary files a bad idea? Because they can easily get out of sync
with the real data files and cause correctness problems. There are two
reasons for using them, both optimizations to avoid reading all of the file
footers in a table. First, _metadata can be used to plan a job because it
has the row group offsets. But planning no longer reads all of the footers;
it uses regular Hadoop file splits instead. The second use is to get the
schema of a table more quickly, but this should be handled by a metastore
that tracks the latest schema. A metastore provides even faster access, a
more reliable schema, and can support schema evolution.

Even with the _metadata files, Spark has had to parallelize building a
table from Parquet files in S3 without a metastore, so I think this
requirement should be removed. In the mean time, you can probably just
build a version of the S3 committer that inherits from
ParquetOutputCommitter instead of FileOutputCommitter. That's probably the
easiest solution. Be sure you run the tests!

rb

On Tue, Mar 28, 2017 at 3:17 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> > On 28 Mar 2017, at 05:20, sririshindra <sririshin...@gmail.com> wrote:
> >
> > Hi
> >
> > I have a job which saves a dataframe as parquet file to s3.
> >
> > The built a jar using your repository https://github.com/rdblue/
> s3committer.
> >
> > I added the following config in the to the Spark Session
> > config("spark.hadoop.spark.sql.parquet.output.committer.class",
> > "com.netflix.bdp.s3.S3PartitionedOutputCommitter")
> >
> >
> > I submitted the job to spark 2.0.2 as follows
> >
> > ./bin/spark-submit --master local[*] --driver-memory 4G --jars
> > /home/rishi/Downloads/hadoop-aws-2.7.3.jar,/home/rishi/
> Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/
> s3committer/build/libs/s3committer-0.5.5.jar
> > --driver-library-path
> > /home/user/Downloads/hadoop-aws-2.7.3.jar,/home/user/
> Downloads/aws-java-sdk-1.7.4.jar,/home/user/Documents/
> s3committer/build/libs/s3committer-0.5.5.jar
> > --class main.streaming.scala.backupdatatos3.backupdatatos3Processorr
> > --packages
> > joda-time:joda-time:2.9.7,org.mongodb.mongo-hadoop:mongo-
> hadoop-core:1.5.2,org.mongodb:mongo-java-driver:3.3.0
> > /home/user/projects/backupjob/target/Backup-1.0-SNAPSHOT.jar
>
>
> The miracle of OSS is that you have the right to fix things, the curse,
> only you get to fix your problems on a timescale that suits
>
>
> >
> >
> > I am gettig the following runtime exception.
> > xception in thread "main" java.lang.RuntimeException:
> > java.lang.RuntimeException: class
> > com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> > org.apache.parquet.hadoop.ParquetOutputCommitter
> >        at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> >        at
> > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.
> prepareWrite(ParquetFileFormat.scala:81)
> >        at
>
>
> here:
>     val committerClass =
>       conf.getClass(
>         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
>         classOf[ParquetOutputCommitter],
>         classOf[ParquetOutputCommitter])
>
>
> At a guess, Ryan's committer isn't a ParquetOutputCommitter.
>
> workarounds
>
> 1. Subclass ParquetOutputCommitter
> 2. Modify ParquetFileFormat to only look for a classOf[FileOutputFormat];
> the ParquetOutputCommitter doesn't do anything other than optionally add a
> metadata file. As that is a performance killer on S3, you should have
> disabled that option already.
>
> #2 is easiest., time to rebuild spark being the only overhead.
>
> HADOOP-13786  is sneaking in Ryan's work underneath things, but even there
> the ParquetFileFormat is going to have trouble. Which is odd, given my
> integration tests did appear to be writing things. I'll take that as a sign
> of coverage problems
>
>
>
>
> > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:108)
> >        at
> > org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm
> and.scala:101)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult$lzycompute(commands.scala:58)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.
> sideEffectResult(commands.scala:56)
> >        at
> > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
> commands.scala:74)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:114)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$1.apply(SparkPlan.scala:114)
> >        at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(
> SparkPlan.scala:135)
> >        at
> > org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> >        at
> > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:
> 132)
> >        at
> > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> >        at
> > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
> QueryExecution.scala:87)
> >        at
> > org.apache.spark.sql.execution.QueryExecution.
> toRdd(QueryExecution.scala:87)
> >        at
> > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.
> scala:492)
> >        at
> > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
> >        at
> > org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
> >        at
> > main.streaming.scala.backupdatatos3.backupdatatos3Processorr$.
> main(backupdatatos3Processorr.scala:229)
> >        at
> > main.streaming.scala.backupdatatos3.backupdatatos3Processorr.main(
> backupdatatos3Processorr.scala)
> >        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >        at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >        at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >        at java.lang.reflect.Method.invoke(Method.java:498)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
> >        at
> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
> >        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:126)
> >        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> > Caused by: java.lang.RuntimeException: class
> > com.netflix.bdp.s3.S3PartitionedOutputCommitter not
> > org.apache.parquet.hadoop.ParquetOutputCommitter
> >        at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2221)
> >        ... 28 more
> >
> > can you please point out my mistake.
> >
> > If possible can you give a working example of saving a dataframe as a
> > parquet file in s3.
> >
> >
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Output-Committers-
> for-S3-tp21033p21246.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to