Thanks Xinyu, that would be a nice improvement! - Chris
On Mon, May 1, 2017 at 9:58 PM, xinyu liu <[email protected]> wrote: > Looked again at Chris's beam-samza-runner implementation. Seems > LocalApplicationRunner.run() should be asynchronous too. Current > implementation is actually using a latch to wait for the StreamProcessors > to finish, which seems unnecessary. And we can provide a waitUntilFinish() > counterpart to the user. I created > https://issues.apache.org/jira/browse/SAMZA-1252 to track it. > > Thanks, > Xinyu > > On Fri, Apr 28, 2017 at 5:55 PM, xinyu liu <[email protected]> wrote: > > > Right, option #2 seems redundant for defining streams after further > > discussion here. StreamSpec itself is flexible enough to achieve both > > static and programmatic specification of the stream. Agree it's not > > convenient for now (pretty obvious after looking at your bsr > > beam.runners.samza.wrapper), and we should provide similar predefined > > convenient wrappers for user to create the StreamSpec. In your case > > something like BoundedStreamSpec.file(....) which will generate the > system > > and serialize the data as you did. > > > > We're still thinking the callback proposed in #2 can be useful for > > requirement #6: injecting other user objects in run time, such as stores > > and metrics. To simplify the user understanding further, I think we might > > hide the ApplicationRunner and expose the StreamApplication instead, > which > > will make requirement #3 not user facing. So the API becomes like: > > > > StreamApplication app = StreamApplication.local(config) > > .init (env -> { > > env.registerStore("my-store", new MyStoreFactory()); > > env.registerMetricsReporter("my-reporte", new > > MyMetricsReporterFactory()); > > }) > > .withLifeCycleListener(myListener); > > > > app.input(BoundedStreamSpec.create("/sample/input.txt")) > > .map(...) > > .window(...) > > > > app.run(); > > > > For requirement #5, I add a .withLifeCycleListener() in the API, which > can > > trigger the callbacks with life cycle events. > > > > For #4: distribution of the jars will be what we have today using the > Yarn > > localization with a remote store like artifactory or http server. We > > discussed where to put the graph serialization. The current thinking is > to > > define a general interface which can backed by a remote store, like > Kafka, > > artifactory or http server. For Kafka, it's straightforward but we will > > have the size limit or cut it by ourselves. For the other two, we need to > > investigate whether we can easily upload jars to our artifactory and > > localizing it with Yarn. Any opinions on this? > > > > Thanks, > > Xinyu > > > > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < > > [email protected]> wrote: > > > >> Your proposal for #1 looks good. > >> > >> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you > add > >> the stream spec straight onto the runner while in #2 you do it in a > >> callback. If it is either-or, #1 looks a lot better for my purposes. > >> > >> For #4 what mechanism are you using to distribute the JARs? Can you use > >> the > >> same mechanism to distribute the serialized graph? > >> > >> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <[email protected]> > >> wrote: > >> > >> > btw, I will get to SAMZA-1246 as soon as possible. > >> > > >> > Thanks, > >> > Xinyu > >> > > >> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <[email protected]> > >> wrote: > >> > > >> > > Let me try to capture the updated requirements: > >> > > > >> > > 1. Set up input streams outside StreamGraph, and treat graph > building > >> as > >> > a > >> > > library (*Fluent, Beam*). > >> > > > >> > > 2. Improve ease of use for ApplicationRunner to avoid complex > >> > > configurations such as zkCoordinator, zkCoordinationService. > >> > (*Standalone*). > >> > > Provide some programmatic way to tweak them in the API. > >> > > > >> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We > >> can > >> > > have one or more implementations but it's hidden from the users. > >> > > > >> > > 4. Separate StreamGraph from runtime environment so it can be > >> serialized > >> > (*Beam, > >> > > Yarn*) > >> > > > >> > > 5. Better life cycle management of application, parity with > >> > > StreamProcessor (*Standalone, Beam*). Stats should include exception > >> in > >> > > case of failure (tracked in SAMZA-1246). > >> > > > >> > > 6. Support injecting user-defined objects into ApplicationRunner. > >> > > > >> > > Prateek and I iterate on the ApplilcationRunner API based on these > >> > > requirements. To support #1, we can set up input streams on the > runner > >> > > level, which returns the MessageStream and allows graph building > >> > > afterwards. The code looks like below: > >> > > > >> > > ApplicationRunner runner = ApplicationRunner.local(); > >> > > runner.input(streamSpec) > >> > > .map(..) > >> > > .window(...) > >> > > runner.run(); > >> > > > >> > > StreamSpec is the building block for setting up streams here. It can > >> be > >> > > set up in different ways: > >> > > > >> > > - Direct creation of stream spec, like runner.input(new > >> StreamSpec(id, > >> > > system, stream)) > >> > > - Load from streamId from env or config, like > >> > runner.input(runner.env(). > >> > > getStreamSpec(id)) > >> > > - Canned Spec which generates the StreamSpec with id, system and > >> stream > >> > > to minimize the configuration. For example, > CollectionSpec.create(new > >> > > ArrayList[]{1,2,3,4}), which will auto generate the system and > stream > >> in > >> > > the spec. > >> > > > >> > > To support #2, we need to be able to set up StreamSpec-related > objects > >> > and > >> > > factories programmatically in env. Suppose we have the following > >> before > >> > > runner.input(...): > >> > > > >> > > runner.setup(env /* a writable interface of env*/ -> { > >> > > env.setStreamSpec(streamId, streamSpec); > >> > > env.setSystem(systemName, systemFactory); > >> > > }) > >> > > > >> > > runner.setup(->) also provides setup for stores and other runtime > >> stuff > >> > > needed for the execution. The setup should be able to serialized to > >> > config. > >> > > For #6, I haven't figured out a good way to inject user-defined > >> objects > >> > > here yet. > >> > > > >> > > With this API, we should be able to also support #4. For remote > >> > > runner.run(), the operator user classes/lamdas in the StreamGraph > >> need to > >> > > be serialized. As today, the existing option is to serialize to a > >> stream, > >> > > either the coordinator stream or the pipeline control stream, which > >> will > >> > > have the size limit per message. Do you see RPC as an option? > >> > > > >> > > For this version of API, seems we don't need the StreamApplication > >> > wrapper > >> > > as well as exposing the StreamGraph. Do you think we are on the > right > >> > path? > >> > > > >> > > Thanks, > >> > > Xinyu > >> > > > >> > > > >> > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > >> > > [email protected]> wrote: > >> > > > >> > >> That should have been: > >> > >> > >> > >> For #1, Beam doesn't have a hard requirement... > >> > >> > >> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt < > >> [email protected]> > >> > >> wrote: > >> > >> > >> > >> > For #1, I doesn't have a hard requirement for any change from > >> Samza. A > >> > >> > very nice to have would be to allow the input systems to be set > up > >> at > >> > >> the > >> > >> > same time as the rest of the StreamGraph. An even nicer to have > >> would > >> > >> be to > >> > >> > do away with the callback based approach and treat graph building > >> as a > >> > >> > library, a la Beam and Flink. > >> > >> > > >> > >> > For the moment I've worked around the two pass requirement (once > >> for > >> > >> > config, once for StreamGraph) by introducing an IR layer between > >> Beam > >> > >> and > >> > >> > the Samza Fluent translation. The IR layer is convenient > >> independent > >> > of > >> > >> > this problem because it makes it easier to switch between the > >> Fluent > >> > and > >> > >> > low-level APIs. > >> > >> > > >> > >> > > >> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd > >> be in > >> > >> > great shape. One additional issue with the status call that I may > >> not > >> > >> have > >> > >> > mentioned is that it provides you no way to get at the cause of > >> > failure. > >> > >> > The StreamProcessor API does allow this via the callback. > >> > >> > > >> > >> > > >> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra > >> configuration > >> > >> > indirection you currently have to jump through (this is also > >> related > >> > to > >> > >> > system consumer configuration from #1. It makes it much easier to > >> > >> discover > >> > >> > what the configurable parameters are too, if we provide some > >> > >> programmatic > >> > >> > way to tweak them in the API - which can turn into config under > the > >> > >> hood. > >> > >> > > >> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu < > [email protected]> > >> > >> wrote: > >> > >> > > >> > >> >> Let me give a shot to summarize the requirements for > >> > ApplicationRunner > >> > >> we > >> > >> >> have discussed so far: > >> > >> >> > >> > >> >> - Support environment for passing in user-defined objects > (streams > >> > >> >> potentially) into ApplicationRunner (*Beam*) > >> > >> >> > >> > >> >> - Improve ease of use for ApplicationRunner to avoid complex > >> > >> >> configurations > >> > >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*) > >> > >> >> > >> > >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). > >> We > >> > can > >> > >> >> have one or more implementations but it's hidden from the users. > >> > >> >> > >> > >> >> - Separate StreamGraph from environment so it can be > serializable > >> > >> (*Beam, > >> > >> >> Yarn*) > >> > >> >> > >> > >> >> - Better life cycle management of application, including > >> > >> >> start/stop/stats (*Standalone, > >> > >> >> Beam*) > >> > >> >> > >> > >> >> > >> > >> >> One way to address 2 and 3 is to provide pre-packaged runner > using > >> > >> static > >> > >> >> factory methods, and the return type will be the > ApplicationRunner > >> > >> >> interface. So we can have: > >> > >> >> > >> > >> >> ApplicationRunner runner = ApplicationRunner.zk() / > >> > >> >> ApplicationRunner.local() > >> > >> >> / ApplicationRunner.remote() / ApplicationRunner.test(). > >> > >> >> > >> > >> >> Internally we will package the right configs and run-time > >> environment > >> > >> with > >> > >> >> the runner. For example, ApplicationRunner.zk() will define all > >> the > >> > >> >> configs > >> > >> >> needed for zk coordination. > >> > >> >> > >> > >> >> To support 1 and 4, can we pass in a lambda function in the > >> runner, > >> > and > >> > >> >> then we can run the stream graph? Like the following: > >> > >> >> > >> > >> >> ApplicationRunner.zk().env(config -> > >> > environment).run(streamGraph); > >> > >> >> > >> > >> >> Then we need a way to pass the environment into the StreamGraph. > >> This > >> > >> can > >> > >> >> be done by either adding an extra parameter to each operator, or > >> > have a > >> > >> >> getEnv() function in the MessageStream, which seems to be pretty > >> > hacky. > >> > >> >> > >> > >> >> What do you think? > >> > >> >> > >> > >> >> Thanks, > >> > >> >> Xinyu > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < > >> > >> >> [email protected]> wrote: > >> > >> >> > >> > >> >> > Thanks for putting this together Yi! > >> > >> >> > > >> > >> >> > I agree with Jake, it does seem like there are a few too many > >> > moving > >> > >> >> parts > >> > >> >> > here. That said, the problem being solved is pretty broad, so > >> let > >> > me > >> > >> >> try to > >> > >> >> > summarize my current understanding of the requirements. Please > >> > >> correct > >> > >> >> me > >> > >> >> > if I'm wrong or missing something. > >> > >> >> > > >> > >> >> > ApplicationRunner and JobRunner first, ignoring test > environment > >> > for > >> > >> the > >> > >> >> > moment. > >> > >> >> > ApplicationRunner: > >> > >> >> > 1. Create execution plan: Same in Standalone and Yarn > >> > >> >> > 2. Create intermediate streams: Same logic but different > leader > >> > >> election > >> > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn). > >> > >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in > >> Yarn. > >> > >> >> > > >> > >> >> > JobRunner: > >> > >> >> > 1. Run the StreamProcessors: Same process in Standalone & > Test. > >> > >> Remote > >> > >> >> host > >> > >> >> > in Yarn. > >> > >> >> > > >> > >> >> > To get a single ApplicationRunner implementation, like Jake > >> > >> suggested, > >> > >> >> we > >> > >> >> > need to make leader election and JobRunner implementation > >> > pluggable. > >> > >> >> > There's still the question of whether ApplicationRunner#run > API > >> > >> should > >> > >> >> be > >> > >> >> > blocking or non-blocking. It has to be non-blocking in YARN. > We > >> > want > >> > >> it > >> > >> >> to > >> > >> >> > be blocking in standalone, but seems like the main reason is > >> ease > >> > of > >> > >> use > >> > >> >> > when launched from main(). I'd prefer making it consitently > >> > >> non-blocking > >> > >> >> > instead, esp. since in embedded standalone mode (where the > >> > processor > >> > >> is > >> > >> >> > running in another container) a blocking API would not be > >> > >> user-friendly > >> > >> >> > either. If not, we can add both run and runBlocking. > >> > >> >> > > >> > >> >> > Coming to RuntimeEnvironment, which is the least clear to me > so > >> > far: > >> > >> >> > 1. I don't think RuntimeEnvironment should be responsible for > >> > >> providing > >> > >> >> > StreamSpecs for streamIds - they can be obtained with a > >> config/util > >> > >> >> class. > >> > >> >> > The StreamProcessor should only know about logical streamIds > and > >> > the > >> > >> >> > streamId <-> actual stream mapping should happen within the > >> > >> >> > SystemProducer/Consumer/Admins provided by the > >> RuntimeEnvironment. > >> > >> >> > 2. There's also other components that the user might be > >> interested > >> > in > >> > >> >> > providing implementations of in embedded Standalone mode > (i.e., > >> not > >> > >> >> just in > >> > >> >> > tests) - MetricsRegistry and JMXServer come to mind. > >> > >> >> > 3. Most importantly, it's not clear to me who creates and > >> manages > >> > the > >> > >> >> > RuntimeEnvironment. It seems like it should be the > >> > ApplicationRunner > >> > >> or > >> > >> >> the > >> > >> >> > user because of (2) above and because StreamManager also needs > >> > >> access to > >> > >> >> > SystemAdmins for creating intermediate streams which users > might > >> > >> want to > >> > >> >> > mock. But it also needs to be passed down to the > >> StreamProcessor - > >> > >> how > >> > >> >> > would this work on Yarn? > >> > >> >> > > >> > >> >> > I think we should figure out how to integrate > RuntimeEnvironment > >> > with > >> > >> >> > ApplicationRunner before we can make a call on one vs. > multiple > >> > >> >> > ApplicationRunner implementations. If we do keep > >> > >> LocalApplicationRunner > >> > >> >> and > >> > >> >> > RemoteApplication (and TestApplicationRunner) separate, agree > >> with > >> > >> Jake > >> > >> >> > that we should remove the JobRunners and roll them up into the > >> > >> >> respective > >> > >> >> > ApplicationRunners. > >> > >> >> > > >> > >> >> > - Prateek > >> > >> >> > > >> > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes < > >> [email protected] > >> > > > >> > >> >> wrote: > >> > >> >> > > >> > >> >> > > Thanks for the SEP! > >> > >> >> > > > >> > >> >> > > +1 on introducing these new components > >> > >> >> > > -1 on the current definition of their roles (see Design > >> feedback > >> > >> >> below) > >> > >> >> > > > >> > >> >> > > *Design* > >> > >> >> > > > >> > >> >> > > - If LocalJobRunner and RemoteJobRunner handle the > >> different > >> > >> >> methods > >> > >> >> > of > >> > >> >> > > launching a Job, what additional value do the different > >> types > >> > of > >> > >> >> > > ApplicationRunner and RuntimeEnvironment provide? It > seems > >> > like > >> > >> a > >> > >> >> red > >> > >> >> > > flag > >> > >> >> > > that all 3 would need to change from environment to > >> > >> environment. It > >> > >> >> > > indicates that they don't have proper modularity. The > >> > >> >> > > call-sequence-figures > >> > >> >> > > support this; LocalApplicationRunner and > >> > RemoteApplicationRunner > >> > >> >> make > >> > >> >> > > the > >> > >> >> > > same calls and the diagram only varies after > >> jobRunner.start() > >> > >> >> > > - As far as I can tell, the only difference between Local > >> and > >> > >> >> Remote > >> > >> >> > > ApplicationRunner is that one is blocking and the other > is > >> > >> >> > > non-blocking. If > >> > >> >> > > that's all they're for then either the names should be > >> changed > >> > >> to > >> > >> >> > > reflect > >> > >> >> > > this, or they should be combined into one > ApplicationRunner > >> > and > >> > >> >> just > >> > >> >> > > expose > >> > >> >> > > separate methods for run() and runBlocking() > >> > >> >> > > - There isn't much detail on why the main() methods for > >> > >> >> Local/Remote > >> > >> >> > > have such different implementations, how they receive the > >> > >> >> Application > >> > >> >> > > (direct vs config), and concretely how the deployment > >> scripts, > >> > >> if > >> > >> >> any, > >> > >> >> > > should interact with them. > >> > >> >> > > > >> > >> >> > > > >> > >> >> > > *Style* > >> > >> >> > > > >> > >> >> > > - nit: None of the 11 uses of the word "actual" in the > doc > >> are > >> > >> >> > > *actually* > >> > >> >> > > needed. :-) > >> > >> >> > > - nit: Colors of the runtime blocks in the diagrams are > >> > >> >> unconventional > >> > >> >> > > and a little distracting. Reminds me of nai won bao. Now > >> I'm > >> > >> >> hungry. > >> > >> >> > :-) > >> > >> >> > > - Prefer the name "ExecutionEnvironment" over > >> > >> "RuntimeEnvironment". > >> > >> >> > The > >> > >> >> > > term "execution environment" is used > >> > >> >> > > - The code comparisons for the ApplicationRunners are not > >> > >> >> > apples-apples. > >> > >> >> > > The local runner example is an application that USES the > >> local > >> > >> >> runner. > >> > >> >> > > The > >> > >> >> > > remote runner example is the just the runner code itself. > >> So, > >> > >> it's > >> > >> >> not > >> > >> >> > > readily apparent that we're comparing the main() methods > >> and > >> > not > >> > >> >> the > >> > >> >> > > application itself. > >> > >> >> > > > >> > >> >> > > > >> > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan < > [email protected]> > >> > >> wrote: > >> > >> >> > > > >> > >> >> > > > Made some updates to clarify the role and functions of > >> > >> >> > RuntimeEnvironment > >> > >> >> > > > in SEP-2. > >> > >> >> > > > > >> > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan < > >> [email protected]> > >> > >> >> wrote: > >> > >> >> > > > > >> > >> >> > > > > Hi, everyone, > >> > >> >> > > > > > >> > >> >> > > > > In light of new features such as fluent API and > standalone > >> > that > >> > >> >> > > introduce > >> > >> >> > > > > new deployment / application launch models in Samza, I > >> > created > >> > >> a > >> > >> >> new > >> > >> >> > > > SEP-2 > >> > >> >> > > > > to address the new use cases. SEP-2 link: > >> > https://cwiki.apache > >> > >> . > >> > >> >> > > > > org/confluence/display/SAMZA/ > SEP-2%3A+ApplicationRunner+ > >> > Design > >> > >> >> > > > > > >> > >> >> > > > > Please take a look and give feedbacks! > >> > >> >> > > > > > >> > >> >> > > > > Thanks! > >> > >> >> > > > > > >> > >> >> > > > > -Yi > >> > >> >> > > > > > >> > >> >> > > > > >> > >> >> > > > >> > >> >> > > >> > >> >> > >> > >> > > >> > >> > > >> > >> > >> > > > >> > > > >> > > >> > > > > >
