Note that the Runner API makes the problem and solution cross-runner, but the problem exists separately for each runner today. So a near-term solution makes sense as long as it is forward-looking (broadcasting runner-provided info to user fns) or has a TTL (putting extra stuff into BoundedSource).
I also don't love the PipelineOptions-as-broadcast-parameters but I foresee that we will eventually benefit from a generic method along these lines. I wouldn't want to rush into anything that we can't change. This should probably be designed first in the Fn API (the runner sending metadata at some level of granularity to the SDK harness) and then the SDK translating that data idiomatically to the user Fn. As for the alternative of adding a context to selected BoundedSource methods, I get behind it with the strong caveat that my support is only because it is already "deprecated". We have explicitly moved away from this approach in DoFn and it is only because of time constraints that we not done the same for CombineFn. Parameter buckets are an OK way to allow backwards compatible (for pipeline authors, not runner authors) extension of parameter lists in interfaces that users implement, but we have developed a much better way. (at the generic level of the Fn API such distinctions don't exist; this point is largely about the Java SDK itself) With SplittableDoFn, all sources and sinks will be DoFn(s) so the extensibility of DoFn applies and is better than a context / kwargs-like parameter bucket. This allows all proposed solutions to easily coexist while being invisible in pipeline when not required, but trivially discoverable by runner authors pre Fn API (post Fn API runner authors will ship the parameters generically and the SDK will deliver it in a way that the runner neither knows nor cares about) If this is not explicitly intended as a short term thing to get us through until we have SDF, then we should instead consider upgrading BoundedSource to be extensible a la DoFn (which might also look a lot like just starting to support SplittableDoFn). But I think that will take longer. Kenn On Fri, Jan 20, 2017 at 11:35 AM, Dan Halperin <[email protected]> wrote: > I think this was initially motivated by BEAM-758 > <https://issues.apache.org/jira/browse/BEAM-758>. Copying from that issue: > > In the forthcoming runner API, a user will be able to save a pipeline > to JSON and then run it repeatedly. > > Many pieces of code (e.g., BigQueryIO.Read or Write) rely on a single > random value (nonce). These values are typically generated at pipeline > construction time (in PTransform#expand), so that they are deterministic > (don't change across retries of DoFns) and global (are the same across all > workers). > > However, once the runner API lands the existing code would result in > the same nonce being reused across jobs, which breaks BigQueryIO. Other > possible solutions: > * Generate nonce in Create(1) | ParDo then use this as a side input. > Should work, as along as side inputs are actually checkpointed. But does > not work for BoundedSource, which cannot accept side inputs. > * If a nonce is only needed for the lifetime of one bundle, can be > generated in startBundle and used in processElement/finishBundle/tearDown. > * Add some context somewhere that lets user code access unique step > name, and somehow generate a nonce consistently e.g. by hashing. Will > usually work, but this is similarly not available to sources. > > I believe your proposal is to add such a nonce to the root PipelineOptions > object -- perhaps, `String getRunNonce()` or something like that. This > would let us have a different nonce for every Pipeline.run() call, but it > would add the requirement to runners that they must populate it. > > My 2c: This would be an easy change for runners and unblocks the issue, but > it complicates the demand on runner authors. Longer-term, plumbing a > context into places like BoundedSource and providing the value there is a > better idea. > > Dan > > On Fri, Jan 20, 2017 at 11:30 AM, Davor Bonaci <[email protected]> wrote: > > > Expecting runners to populate, or override, SDK-level pipeline options > > isn't a great thing, particularly in a scenario that would affect > > correctness. > > > > The main thing is discoverability of a subtle API like this -- there's > > little chance somebody writing a new runner would stumble across this and > > do the right thing. It would be much better to make expectations from a > > runner clear, say, via a runner-provided "context" API. I'd stay away > from > > a pipeline option with a default value. > > > > The other contentions topic here is the usage of a job-level or > > execution-level identifier. This easily becomes ambiguous in the presence > > of Flink's savepoints, Dataflow's update, fast re-execution, canary vs. > > production pipeline, cross-job optimizations, etc. I think we'd be better > > off with a transform-level nonce than a job-level one. > > > > Finally, the real solution is to enhance the model and make such a > > functionality available to everyone, e.g., roughly "init" + "checkpoint" > + > > "side-input to source / splittabledofn / composable io". > > > > -- > > > > Practically, to solve the problem at hand quickly, I'd be in favor of a > > context-based approach. > > > > On Thu, Jan 19, 2017 at 10:22 AM, Sam McVeety <[email protected]> > > wrote: > > > > > Hi folks, I'm looking for feedback on whether the following is a > > reasonable > > > approach to handling ValueProviders that are intended to be populated > at > > > runtime by a given Runner (e.g. a Dataflow job ID, which is a GUID from > > the > > > service). Two potential pieces of a solution: > > > > > > 1. Annotate such parameters with @RunnerProvided, which results in an > > > Exception if the user manually tries to set the parameter. > > > > > > 2. Allow for a DefaultValueFactory to be present for the set of Runners > > > that do not override the parameter. > > > > > > Best, > > > Sam > > > > > >
