See if this is a right interpretation: * Hadoop's InputSplit has a getLocations <https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapred/InputSplit.html#getLocations()> method that in some cases exposes useful information about the underlying data locality. * Beam jobs may run on the same cluster as the HDFS storage nodes (e.g.), in which case it's useful to expose the locality to runners to assign mappers (e.g.) to be near the data.
In that case, I think it makes perfect sense to expose the `getLocations` on the actual HDFS sources. To do this, we would need to make the HDFS Source an actual BoundedSource with a getter for the locations -- rather than an anonymous inner class. See here: https://github.com/apache/ incubator-beam/blob/master/sdks/java/io/hdfs/src/main/ java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java#L180 If that's right, makes sense to me! Dan On Mon, Sep 26, 2016 at 12:55 PM, Amit Sela <[email protected]> wrote: > Thanks for the through response Dan, what you mentioned is very interesting > and would clearly benefit runners. > > I was actually talking about something more "old-school", and specific to > batch. > If running a job on YARN - via MapReduce, Spark, etc. - you'd prefer that > YARN would assign tasks working on splits locally. > > Spark does this for HDFS/HBase/S3: > https://github.com/apache/spark/blob/branch-1.6/core/src/ > main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L241 > . > > Since for most(?) open-source runners YARN is the preferred/popular > resource manager, and HDFS is the preferred filesystem, I was wondering if > that's something that could be shared across runners and not being > re-written per-runner. > I'm talking about obtaining the locations of the input splits, and passing > them to the runners to choose how to use them. > > I wonder if there's a need for that besides the Spark runner though, it's > only for batch.. I opened https://issues.apache.org/jira/browse/BEAM-673 > as > a "runner-spark" component for now. > > Thanks, > Amit > > > On Mon, Sep 26, 2016 at 10:39 PM Dan Halperin <[email protected] > > > wrote: > > > Hi Amit, > > > > Sorry to be late to the thread, but I've been traveling. I'm not sure I > > fully grokked the question, but here's one attempt at an answer: > > > > In general, any options on where a pipeline is executed should be > > runner-specific. One example: for Dataflow, we have the zone > > < > > https://github.com/apache/incubator-beam/blob/master/runners > /google-cloud-dataflow-java/src/main/java/org/apache/beam/ > runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167 > > > > > option, > > which can be used to control what GCE zone VMs are launched in. I could > > imagine similar things for Spark/Yarn, etc. > > > > I think your question may be a bit deeper: given a pipeline without such > > explicit configuration from the user, can a runner do something smart? I > > think the answer to that is also yes. Today, we have DisplayData and soon > > we will have the Runner API -- these expose in a standard way information > > about file paths, BigQuery tables, Bigtable clusters, Kafka clusters, > etc., > > that may be used by the pipeline. Once the Runner API is standardized and > > implemented, a runner ought to be able to inspect the metadata and say > > "hey, I see you're reading from this Kafka cluster, let's try to be near > > it". For example. > > > > Does that answer the question / did I miss something? > > > > Thanks, > > Dan > > > > On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela <[email protected]> wrote: > > > > > Generally this makes sense, though I thought that this is what > > > IOChannelFactory was (also) about, and eventually the runner needs to > > > facilitate the splitting/partitioning of the source, so I was wondering > > if > > > the source could have a generic mechanism for locality as well. > > > > > > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson <[email protected]> > > > wrote: > > > > > > > I think the runners should. Each framework has put far more effort > into > > > > data locality than Beam should. Beam should just take advantage of > it. > > > > > > > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela <[email protected]> > wrote: > > > > > > > > > Not where in the file, where in the cluster. > > > > > > > > > > Like you said - mapper - in MapReduce the mapper instance will > > *prefer* > > > > to > > > > > start on the same machine as the Node hosting it (unless that's > > > changed, > > > > > I've been out of touch with MR for a while...). > > > > > > > > > > And for Spark - > > > > > > > > > > > > > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba > > > se/content/performance_optimization/data_locality.html > > > > > . > > > > > > > > > > As for Flink, it's a streaming-first engine (sort of the opposite > of > > > > Spark, > > > > > being a batch-first engine) so I *assume* they don't have this > notion > > > and > > > > > simply "stream" input. > > > > > > > > > > Dataflow - no idea... > > > > > > > > > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson < > > [email protected]> > > > > > wrote: > > > > > > > > > > > I've only ever seen that being used to figure out which file the > > > > > > runner/mapper/operation is working on. Otherwise, I haven't seen > > > those > > > > > > operations care where in the file they're working. > > > > > > > > > > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela <[email protected]> > > > > wrote: > > > > > > > > > > > > > Wouldn't it force all runners to implement this for all > > distributed > > > > > > > filesystems ? It's true that each runner has it's own > > > "partitioning" > > > > > > > mechanism, but I assume (maybe I'm wrong) that open-source > > runners > > > > use > > > > > > the > > > > > > > Hadoop InputFormat/InputSplit for that.. and the proper > > connectors > > > > for > > > > > > that > > > > > > > to run on top of s3/gs. > > > > > > > > > > > > > > If this is wrong, each runner should take care of it's own, but > > if > > > > not, > > > > > > we > > > > > > > could have a generic solution for runners, no ? > > > > > > > > > > > > > > Thanks, > > > > > > > Amit > > > > > > > > > > > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré < > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Amit, > > > > > > > > > > > > > > > > as the purpose is to remove IOChannelFactory, then I would > > > suggest > > > > > it's > > > > > > > > a runner concern. The Read.Bounded should "locate" the > bundles > > > on a > > > > > > > > executor close to the read data (even if it's not always > > possible > > > > > > > > depending of the source). > > > > > > > > > > > > > > > > My $0.01 > > > > > > > > > > > > > > > > Regards > > > > > > > > JB > > > > > > > > > > > > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote: > > > > > > > > > It's not new that batch pipeline can optimize on data > > locality, > > > > my > > > > > > > > question > > > > > > > > > is regarding this responsibility in Beam. > > > > > > > > > If runners should implement a generic Read.Bounded support, > > > > should > > > > > > they > > > > > > > > > also implement locating the input blocks ? or should it be > a > > > part > > > > > > > > > of IOChannelFactory implementations ? or another way to go > at > > > it > > > > > that > > > > > > > I'm > > > > > > > > > missing ? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Amit. > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Jean-Baptiste Onofré > > > > > > > > [email protected] > > > > > > > > http://blog.nanthrax.net > > > > > > > > Talend - http://www.talend.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
