[
https://issues.apache.org/jira/browse/SPARK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985040#comment-15985040
]
Steve Loughran commented on SPARK-7481:
---------------------------------------
(This is a fairly long comment, but it tries to summarise the entire state of
interaction with object stores, esp. S3A on Hadoop 2.8+. Azure is simpler, GCS:
google's problem. Swift. not used very much).
If you look at object store & Spark (or indeed, any code which uses a
filesystem as the source and dest of work), there are problems which can
generally be grouped into various categories.
h3. Foundational: talking to the object stores
classpath & execution: can you wire the JARs up? Longstanding issue in ASF
Spark releases (SPARK-5348, SPARK-12557). This was exacerbated by the movement
of S3n:// to the hadoop-aws-package (FWIW, I hadn't noticed that move, I'd have
blocked it if I'd been paying attention). This includes transitive problems
(SPARK-11413)
Credential propagation. Spark's env var propagation is pretty cute here;
SPARK-19739 picks up {{AWS_SESSION_TOKEN}} too. Diagnostics on failure is a
real pain.
h3. Observable Inconsistencies leading to Data loss
Generally where the metaphor "it's just a filesystem" fail. These are bad
because they often "just work", especially in dev & Test with small datasets,
and when they go wrong, they can fail by generating bad results *and nobody
notices*.
* Expectations of consistent listing of "directories" S3Guard deals with this,
HADOOP-13345, as can Netflix's S3mper and AWS's premium Dynamo backed S3
storage.
* Expectations on the transacted nature of Directory renames, the core atomic
commit operations against full filesystems.
* Expectations that when things are deleted they go away. This does become
visible sometimes, usually in checks for a destination not existing
(SPARK-19013)
* Expectations that write-in-progress data is visible/flushed, that {{close()}}
is low cost. SPARK-19111.
Committing pretty much combines all of these, see below for more details.
h3. Aggressively bad performance
That's the mismatch between what the object store offers, what the apps expect,
and the metaphor work in the Hadooop FileSystem implementations, which, in
trying to hide the conceptual mismatch can actually amplify the problem.
Example: Directory tree scanning at the start of a query. The mock directory
structure allows callers to do treewalks, when really a full list of all
children can be done as a direct O(1) call. SPARK-17159 covers some of this for
scanning directories in Spark Streaming, but there's a hidden tree walk in
every call to {{FileSystem.globStatus()}} (HADOOP-13371). Given how S3Guard
transforms this treewalk, and you need it for consistency, that's probably the
best solution for now. Although I have a PoC which does a full List **/*
followed by a filter, that's not viable when you have a wide deep tree and do
need to prune aggressively.
Checkpointing to object stores is similar: it's generally not dangerous to do
the write+rename, just adds the copy overhead, consistency issues
notwithstanding.
h3. Suboptimal code.
There's opportunities for speedup, but if it's not on the critical path, not
worth the hassle. That said, as every call to {{getFileStatus()}} can take
hundreds of millis, they get onto the critical path quite fast. Example checks
for a file existing before calling {{fs.delete(path)}} (this is always a no-op
if the dest path isn't there), and the equivalent on mkdirs: {{if
(!fs.exists(dir) fs.mkdirs(path)}}. Hadoop 3.0 will help steer people on the
path of righteousness there by deprecating a couple of methods which encourage
inefficiencies (isFile/isDir).
h3. The commit problem
The full commit problem combines all of these: you need a consistent list of
source data, your deleted destination path musn't appear in listings, the
commit of each task must promote a task's work to the pending output of the
job; an abort must leave no trace of it. The final job commit must place data
into the final destination, again, job abort not make any output visible.
There's some ambiguity about what happens if task and job commits fails;
generally the safest is "abort everything". Futhermore nobody has any idea what
to do if an {{abort()}} raises exceptions. Oh, and all of this must be fast.
Spark is no better or worse than the core MapReduce committers here, or that of
Hive.
Spark generally uses the Hadoop {{FileOutputFormat}} via the
{{HadoopMapReduceCommitProtocol}}, directly or indirectly (e.g
{{ParquetOutputFormat}}), extracting its committer and casting it to
{{FileOutputCommitter}}, primarily to get a working directory. This committer
assumes the destination is a consistent FS, uses renames when promoting task
and job output, assuming that is so fast it doesn't even bother to log a
message "about to rename". Hence the recurrent Stack Overflow question "why
does my S3 job hang at the end with no data being uploaded and nothing logged".
It's blocked while data is copied inside S3 at 10MB/s. The direct output
committer addressed the rename delays, but at the expense of the requirements
of "Commit": nothing visible until the job is completed, hence is deletion
SPARK-10063. Using the "v2 commit algorithm" is somewhat faster (SPARK-20107),
but still prone to generating bad results in the presence of list inconsistency.
Hadoop's {{FileOutputCommitter}} is horribly convoluted code with two different
commit strategies intermingled into a co-recursive nightmare that doesn't make
sense, even when stepped through line by line. Believe me. Trying to subclass
it is an exercise in pain and suffering.
Netflix have had to do exactly that with their [S3
committer|https://github.com/rdblue/s3committer]. This is in use in production,
save minutes on queries and is resilient to failure. Here work is written to a
local dir, copied in a multipart upload direct to the destination directory, an
upload which is only completed when the job is committed. There's the delay for
uploads in task commit, but job commit is a parallelizable set of short POSTs.
The (incomplete) S3Guard "magic" committer does some things in the FS, namely
remaps a path of the form {{s3a://data/.magic/job_0001_001/task001}} to a
multipart write to {{s3a://dest}}, saving the data for the final commit to into
that .magic dir and then completing things in task commit. I've stopped work on
that, pulling in the Netfix committer as the "Staging Committer" instead. Why?
they've been using it, so has the lovely attribute "works". Also, it doesn't
need the consistent FS of S3Guard, so works with an inconsistent S3 bucket as a
destination of work (not a source though, due to listing inconsistencies).
See [S3A
Committer|https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md]
for the details, follow HADOOP-13786 for the development work. If you can get
your classpath right, this already works at "demo scale"" for Spark writing
everything but Parquet data (ORC, avro, CSV, ...). There's s bit of
hard-codedness in the {{ParquetFileFormat}} setup which stops Parquet here, but
that's tractable in about 10 LoC.
That is: if you get your CP right and have Hadoop trunk + patches, you get O(1)
zero-rename commits to S3 in Spark without touching any of Spark's source code.
You still have to deal with the consistency problem on followup queries though,
so either run against a consistent S3 implementation, or run with S3Guard.
Other strategies for addressing the commit problem:
* Databricks are doing commit-as-a-service within AWS; I've got not details
there other than the talk @ Spark Summit East.
* Hive: see HIVE-14269. Long term they're thinking of moving off simple
"listFiles() as index of record" and use some manifest file which can be
written in a short PUT, read similarly (though it needs a story w.r.t update
inconsistency)
* Azure WASB is a consistent store with fast rename. Works out the box :)
h3. Putting it all together
Don't use S3 as a destination for your work, not unless you have something
supporting the commit. For that: get involved with HADOOP-13786. Classpaths:
yes, needs to be done somehow.
> Add spark-hadoop-cloud module to pull in object store support
> -------------------------------------------------------------
>
> Key: SPARK-7481
> URL: https://issues.apache.org/jira/browse/SPARK-7481
> Project: Spark
> Issue Type: Improvement
> Components: Build
> Affects Versions: 2.1.0
> Reporter: Steve Loughran
>
> To keep the s3n classpath right, to add s3a, swift & azure, the dependencies
> of spark in a 2.6+ profile need to add the relevant object store packages
> (hadoop-aws, hadoop-openstack, hadoop-azure)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]