Hi Amit, Sorry to be late to the thread, but I've been traveling. I'm not sure I fully grokked the question, but here's one attempt at an answer:
In general, any options on where a pipeline is executed should be runner-specific. One example: for Dataflow, we have the zone <https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167> option, which can be used to control what GCE zone VMs are launched in. I could imagine similar things for Spark/Yarn, etc. I think your question may be a bit deeper: given a pipeline without such explicit configuration from the user, can a runner do something smart? I think the answer to that is also yes. Today, we have DisplayData and soon we will have the Runner API -- these expose in a standard way information about file paths, BigQuery tables, Bigtable clusters, Kafka clusters, etc., that may be used by the pipeline. Once the Runner API is standardized and implemented, a runner ought to be able to inspect the metadata and say "hey, I see you're reading from this Kafka cluster, let's try to be near it". For example. Does that answer the question / did I miss something? Thanks, Dan On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela <[email protected]> wrote: > Generally this makes sense, though I thought that this is what > IOChannelFactory was (also) about, and eventually the runner needs to > facilitate the splitting/partitioning of the source, so I was wondering if > the source could have a generic mechanism for locality as well. > > On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson <[email protected]> > wrote: > > > I think the runners should. Each framework has put far more effort into > > data locality than Beam should. Beam should just take advantage of it. > > > > On Thu, Sep 22, 2016, 7:57 AM Amit Sela <[email protected]> wrote: > > > > > Not where in the file, where in the cluster. > > > > > > Like you said - mapper - in MapReduce the mapper instance will *prefer* > > to > > > start on the same machine as the Node hosting it (unless that's > changed, > > > I've been out of touch with MR for a while...). > > > > > > And for Spark - > > > > > > > > https://databricks.gitbooks.io/databricks-spark-knowledge-ba > se/content/performance_optimization/data_locality.html > > > . > > > > > > As for Flink, it's a streaming-first engine (sort of the opposite of > > Spark, > > > being a batch-first engine) so I *assume* they don't have this notion > and > > > simply "stream" input. > > > > > > Dataflow - no idea... > > > > > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson <[email protected]> > > > wrote: > > > > > > > I've only ever seen that being used to figure out which file the > > > > runner/mapper/operation is working on. Otherwise, I haven't seen > those > > > > operations care where in the file they're working. > > > > > > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela <[email protected]> > > wrote: > > > > > > > > > Wouldn't it force all runners to implement this for all distributed > > > > > filesystems ? It's true that each runner has it's own > "partitioning" > > > > > mechanism, but I assume (maybe I'm wrong) that open-source runners > > use > > > > the > > > > > Hadoop InputFormat/InputSplit for that.. and the proper connectors > > for > > > > that > > > > > to run on top of s3/gs. > > > > > > > > > > If this is wrong, each runner should take care of it's own, but if > > not, > > > > we > > > > > could have a generic solution for runners, no ? > > > > > > > > > > Thanks, > > > > > Amit > > > > > > > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré < > > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi Amit, > > > > > > > > > > > > as the purpose is to remove IOChannelFactory, then I would > suggest > > > it's > > > > > > a runner concern. The Read.Bounded should "locate" the bundles > on a > > > > > > executor close to the read data (even if it's not always possible > > > > > > depending of the source). > > > > > > > > > > > > My $0.01 > > > > > > > > > > > > Regards > > > > > > JB > > > > > > > > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote: > > > > > > > It's not new that batch pipeline can optimize on data locality, > > my > > > > > > question > > > > > > > is regarding this responsibility in Beam. > > > > > > > If runners should implement a generic Read.Bounded support, > > should > > > > they > > > > > > > also implement locating the input blocks ? or should it be a > part > > > > > > > of IOChannelFactory implementations ? or another way to go at > it > > > that > > > > > I'm > > > > > > > missing ? > > > > > > > > > > > > > > Thanks, > > > > > > > Amit. > > > > > > > > > > > > > > > > > > > -- > > > > > > Jean-Baptiste Onofré > > > > > > [email protected] > > > > > > http://blog.nanthrax.net > > > > > > Talend - http://www.talend.com > > > > > > > > > > > > > > > > > > > > >
