Note that the Runner API makes the problem and solution cross-runner, but
the problem exists separately for each runner today. So a near-term
solution makes sense as long as it is forward-looking (broadcasting
runner-provided info to user fns) or has a TTL (putting extra stuff into
BoundedSource).

I also don't love the PipelineOptions-as-broadcast-parameters but I foresee
that we will eventually benefit from a generic method along these lines. I
wouldn't want to rush into anything that we can't change. This should
probably be designed first in the Fn API (the runner sending metadata at
some level of granularity to the SDK harness) and then the SDK translating
that data idiomatically to the user Fn.

As for the alternative of adding a context to selected BoundedSource
methods, I get behind it with the strong caveat that my support is only
because it is already "deprecated". We have explicitly moved away from this
approach in DoFn and it is only because of time constraints that we not
done the same for CombineFn. Parameter buckets are an OK way to allow
backwards compatible (for pipeline authors, not runner authors) extension
of parameter lists in interfaces that users implement, but we have
developed a much better way. (at the generic level of the Fn API such
distinctions don't exist; this point is largely about the Java SDK itself)

With SplittableDoFn, all sources and sinks will be DoFn(s) so the
extensibility of DoFn applies and is better than a context / kwargs-like
parameter bucket. This allows all proposed solutions to easily coexist
while being invisible in pipeline when not required, but trivially
discoverable by runner authors pre Fn API (post Fn API runner authors will
ship the parameters generically and the SDK will deliver it in a way that
the runner neither knows nor cares about)

If this is not explicitly intended as a short term thing to get us through
until we have SDF, then we should instead consider upgrading BoundedSource
to be extensible a la DoFn (which might also look a lot like just starting
to support SplittableDoFn). But I think that will take longer.

Kenn

On Fri, Jan 20, 2017 at 11:35 AM, Dan Halperin <[email protected]>
wrote:

> I think this was initially motivated by BEAM-758
> <https://issues.apache.org/jira/browse/BEAM-758>. Copying from that issue:
>
>     In the forthcoming runner API, a user will be able to save a pipeline
> to JSON and then run it repeatedly.
>
>     Many pieces of code (e.g., BigQueryIO.Read or Write) rely on a single
> random value (nonce). These values are typically generated at pipeline
> construction time (in PTransform#expand), so that they are deterministic
> (don't change across retries of DoFns) and global (are the same across all
> workers).
>
>     However, once the runner API lands the existing code would result in
> the same nonce being reused across jobs, which breaks BigQueryIO. Other
> possible solutions:
>        * Generate nonce in Create(1) | ParDo then use this as a side input.
> Should work, as along as side inputs are actually checkpointed. But does
> not work for BoundedSource, which cannot accept side inputs.
>        * If a nonce is only needed for the lifetime of one bundle, can be
> generated in startBundle and used in processElement/finishBundle/tearDown.
>        * Add some context somewhere that lets user code access unique step
> name, and somehow generate a nonce consistently e.g. by hashing. Will
> usually work, but this is similarly not available to sources.
>
> I believe your proposal is to add such a nonce to the root PipelineOptions
> object -- perhaps, `String getRunNonce()` or something like that. This
> would let us have a different nonce for every Pipeline.run() call, but it
> would add the requirement to runners that they must populate it.
>
> My 2c: This would be an easy change for runners and unblocks the issue, but
> it complicates the demand on runner authors. Longer-term, plumbing a
> context into places like BoundedSource and providing the value there is a
> better idea.
>
> Dan
>
> On Fri, Jan 20, 2017 at 11:30 AM, Davor Bonaci <[email protected]> wrote:
>
> > Expecting runners to populate, or override, SDK-level pipeline options
> > isn't a great thing, particularly in a scenario that would affect
> > correctness.
> >
> > The main thing is discoverability of a subtle API like this -- there's
> > little chance somebody writing a new runner would stumble across this and
> > do the right thing. It would be much better to make expectations from a
> > runner clear, say, via a runner-provided "context" API. I'd stay away
> from
> > a pipeline option with a default value.
> >
> > The other contentions topic here is the usage of a job-level or
> > execution-level identifier. This easily becomes ambiguous in the presence
> > of Flink's savepoints, Dataflow's update, fast re-execution, canary vs.
> > production pipeline, cross-job optimizations, etc. I think we'd be better
> > off with a transform-level nonce than a job-level one.
> >
> > Finally, the real solution is to enhance the model and make such a
> > functionality available to everyone, e.g., roughly "init" + "checkpoint"
> +
> > "side-input to source / splittabledofn / composable io".
> >
> > --
> >
> > Practically, to solve the problem at hand quickly, I'd be in favor of a
> > context-based approach.
> >
> > On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <[email protected]>
> > wrote:
> >
> > > Hi folks, I'm looking for feedback on whether the following is a
> > reasonable
> > > approach to handling ValueProviders that are intended to be populated
> at
> > > runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from
> > the
> > > service).  Two potential pieces of a solution:
> > >
> > > 1. Annotate such parameters with @RunnerProvided, which results in an
> > > Exception if the user manually tries to set the parameter.
> > >
> > > 2. Allow for a DefaultValueFactory to be present for the set of Runners
> > > that do not override the parameter.
> > >
> > > Best,
> > > Sam
> > >
> >
>

Reply via email to