Hi Amit,

Sorry to be late to the thread, but I've been traveling. I'm not sure I
fully grokked the question, but here's one attempt at an answer:

In general, any options on where a pipeline is executed should be
runner-specific. One example: for Dataflow, we have the zone
<https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java#L167>
option,
which can be used to control what GCE zone VMs are launched in. I could
imagine similar things for Spark/Yarn, etc.

I think your question may be a bit deeper: given a pipeline without such
explicit configuration from the user, can a runner do something smart? I
think the answer to that is also yes. Today, we have DisplayData and soon
we will have the Runner API -- these expose in a standard way information
about file paths, BigQuery tables, Bigtable clusters, Kafka clusters, etc.,
that may be used by the pipeline. Once the Runner API is standardized and
implemented, a runner ought to be able to inspect the metadata and say
"hey, I see you're reading from this Kafka cluster, let's try to be near
it". For example.

Does that answer the question / did I miss something?

Thanks,
Dan

On Thu, Sep 22, 2016 at 8:29 AM, Amit Sela <[email protected]> wrote:

> Generally this makes sense, though I thought that this is what
> IOChannelFactory was (also) about, and eventually the runner needs to
> facilitate the splitting/partitioning of the source, so I was wondering if
> the source could have a generic mechanism for locality as well.
>
> On Thu, Sep 22, 2016 at 6:11 PM Jesse Anderson <[email protected]>
> wrote:
>
> > I think the runners should. Each framework has put far more effort into
> > data locality than Beam should. Beam should just take advantage of it.
> >
> > On Thu, Sep 22, 2016, 7:57 AM Amit Sela <[email protected]> wrote:
> >
> > > Not where in the file, where in the cluster.
> > >
> > > Like you said - mapper - in MapReduce the mapper instance will *prefer*
> > to
> > > start on the same machine as the Node hosting it (unless that's
> changed,
> > > I've been out of touch with MR for a while...).
> > >
> > > And for Spark -
> > >
> > >
> > https://databricks.gitbooks.io/databricks-spark-knowledge-ba
> se/content/performance_optimization/data_locality.html
> > > .
> > >
> > > As for Flink, it's a streaming-first engine (sort of the opposite of
> > Spark,
> > > being a batch-first engine) so I *assume* they don't have this notion
> and
> > > simply "stream" input.
> > >
> > > Dataflow - no idea...
> > >
> > > On Thu, Sep 22, 2016 at 5:45 PM Jesse Anderson <[email protected]>
> > > wrote:
> > >
> > > > I've only ever seen that being used to figure out which file the
> > > > runner/mapper/operation is working on. Otherwise, I haven't seen
> those
> > > > operations care where in the file they're working.
> > > >
> > > > On Thu, Sep 22, 2016 at 5:57 AM Amit Sela <[email protected]>
> > wrote:
> > > >
> > > > > Wouldn't it force all runners to implement this for all distributed
> > > > > filesystems ? It's true that each runner has it's own
> "partitioning"
> > > > > mechanism, but I assume (maybe I'm wrong) that open-source runners
> > use
> > > > the
> > > > > Hadoop InputFormat/InputSplit for that.. and the proper connectors
> > for
> > > > that
> > > > > to run on top of s3/gs.
> > > > >
> > > > > If this is wrong, each runner should take care of it's own, but if
> > not,
> > > > we
> > > > > could have a generic solution for runners, no ?
> > > > >
> > > > > Thanks,
> > > > > Amit
> > > > >
> > > > > On Thu, Sep 22, 2016 at 3:30 PM Jean-Baptiste Onofré <
> > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi Amit,
> > > > > >
> > > > > > as the purpose is to remove IOChannelFactory, then I would
> suggest
> > > it's
> > > > > > a runner concern. The Read.Bounded should "locate" the bundles
> on a
> > > > > > executor close to the read data (even if it's not always possible
> > > > > > depending of the source).
> > > > > >
> > > > > > My $0.01
> > > > > >
> > > > > > Regards
> > > > > > JB
> > > > > >
> > > > > > On 09/22/2016 02:26 PM, Amit Sela wrote:
> > > > > > > It's not new that batch pipeline can optimize on data locality,
> > my
> > > > > > question
> > > > > > > is regarding this responsibility in Beam.
> > > > > > > If runners should implement a generic Read.Bounded support,
> > should
> > > > they
> > > > > > > also implement locating the input blocks ? or should it be a
> part
> > > > > > > of IOChannelFactory implementations ? or another way to go at
> it
> > > that
> > > > > I'm
> > > > > > > missing ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Amit.
> > > > > > >
> > > > > >
> > > > > > --
> > > > > > Jean-Baptiste Onofré
> > > > > > [email protected]
> > > > > > http://blog.nanthrax.net
> > > > > > Talend - http://www.talend.com
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to