1. +1 for not requiring explicit type information unless its unavoidable. This one seems easy to fix, but there are other places we should address too (OutputStream, Window operator).
We should probably discuss the Window API separately from this discussion, but to Chris' point: 2. Re: 2 lambdas, the first is an initial value Java 'Supplier', the second is a Samza 'FoldFunction'. It might be clearer to create a new class (e.g., 'Aggregator') with 'getInitialValue' and 'aggregate' methods that users should implement instead of these 2 lambdas. They'll lose the ability to provide these functions as lambdas, but I think overall it'll help readability and understandability. We can then provide default implementations for this class for common aggregations: accumulation (add to collection), sum, max, min, average, percentiles etc. Is this what you meant Chris? 3. I'm also strongly in favor of making the window APIs composable so that we don't have so many variants and parameters. Otherwise, as we add more parameters for event time support and type information for serdes, it'll get really difficult to both discover the appropriate window variant and understand a window specification once written. - Prateek On Tue, Jun 20, 2017 at 10:15 AM, Chris Pettitt < cpett...@linkedin.com.invalid> wrote: > Feedback for PageViewCounterStreamSpecExample: > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L65: > > When we set up the input we had the message type, but it looks like we > are not propagating it via StreamIO.Input thus require a cast here. > The fix seems to be to generify StreamIO.Input. > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L67: > > The two lambdas here are not very intuitive. I'm assuming based on > some previous discussion that these are setting up a fold function? I > would suggest making this more explicit, probably with a fold function > type. > > https://github.com/nickpan47/samza/blob/new-api-v2/samza- > core/src/test/java/org/apache/samza/example/PageViewCounterStreamSpecExamp > le.java#L94: > > The generic type parameters for WindowPane suggest that WindowPanes > must be keyed. This is a reasonable assumption for > "keyedTumblingWindow" but might not make sense for other cases, like a > global combine operation. Beam separates the two ideas into: 1) a > typed WindowValue, basically the primitive for values propagating > through the graph, contains a value and a windowing information and 2) > KV for keyed values. > > FWIW, the larger windowing and grouping concepts are also separated in > Beam. It looks like this is the case with Flink as well. In examples > from both the user specifies the windowing first and then separately > specify the aggregation operation (e.g. group by key, combine, sum, > etc.). This saves from the combinatorial explosion of keyed / global, > unwindowed (batch) / session / fixed / tumbling / etc., GBK / sum / > count / etc. > > - Chris > > On Tue, Jun 20, 2017 at 10:46 AM, Chris Pettitt <cpett...@linkedin.com> > wrote: > > Yi, > > > > What examples should we be looking at for new-api-v2? > > > > 1. samza/samza-core/src/test/java/org/apache/samza/example/ > PageViewCounterStreamSpecExample.java > > > > others? > > > > - Chris > > > > On Mon, Jun 19, 2017 at 5:29 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> Hi, all, > >> > >> Here is the promised code examples for the revised API, and the related > >> change to how we specify serdes in the API: > >> > >> - User example for the new API chagne: > >> https://github.com/nickpan47/samza/tree/new-api-v2 > >> > >> - Prateekās PR for the proposed schema registry change: > >> https://github.com/nickpan47/samza/pull/2/files > >> > >> Please feel free to comment and provide feedbacks! > >> > >> > >> Thanks! > >> > >> > >> -Yi > >> > >> On Tue, Jun 6, 2017 at 11:16 AM, Yi Pan <nickpa...@gmail.com> wrote: > >> > >>> Hi, all, > >>> > >>> Thanks for all the inputs! Finally I got some time to go through the > >>> discussion thread and digest most of the points made above. Here is my > >>> personal summary: > >>> > >>> Consensus on requirements: > >>> > >>> 1. ApplicationRunner needs async APIs. > >>> 2. ApplicationRunner can be hidden from user (except maybe in > config) > >>> 3. StreamApplication is the direct wrapper for the programming > >>> interface (i.e. removing StreamGraph from the user API and allow > users to > >>> call input() and output() from the StreamApplication) in main() > >>> 4. There has to be a serialization format of the StreamApplication > >>> itself, s.t. the tasks can just deserialize and create the user > logic > >>> included in StreamApplication in multiple TaskContext. > >>> 5. JobRunner seems to be a very thin layer on-top-of StreamProcessor > >>> or YarnJob, and it is always a LocalJob in a LocalApplitionRunner > and a > >>> RemoteJob in a RemoteApplicationRunner. There is a desire to remove > it > >>> 6. StreamApplication needs to have some methods to allow > user-injected > >>> global objects for the whole application, such as JmxServer, > >>> MetricsReporter, etc. > >>> > >>> > >>> Some additional discussion points: > >>> > >>> 1. In StreamApplication#input()/output(), what should be the input > / > >>> output parameter? The StreamSpec? Or the actual implementation I/O > object > >>> to provide messages (i.e. similar to socket reader/file reader > object)? In > >>> the later case, we will need to define an abstract layer of > StreamReader > >>> and StreamWriter in the user-facing API that supports read/write of > >>> partitioned streams on top of the SystemConsumer/SystemProducer/ > SystemAdmin > >>> objects. Also, the number of I/O streams via the > StreamReader/StreamWriter > >>> can not be pre-determined (i.e. depending on input stream > partitions and > >>> the groupers). Hence, I am leaning toward to expose StreamSpec in > the API > >>> and let user builds the StreamSpec via SpecBuilder. The actual I/O > objects > >>> will be instantiated when SystemConsumer/SystemProducer are > instantiated, > >>> with the number of physical partitions in each container. > >>> 2. There is a need to support task-level programs via the same > launch > >>> model as well. > >>> > >>> > >>> Some ideas to implement the above requirements: > >>> > >>> 1. StreamGraph#write() should be used internally to generate and > >>> persist the serialized format of user logic. Then, > StreamGraph#read() > >>> should give back a deserialized version of user logic. This would > implies > >>> that the user functions defined in APIs are mandated to be > serializable. > >>> 2. StreamApplication should include a SpecBuilder provides the > >>> instantiation of MessageStream/Stores, which is passed to > >>> StreamApplication#input() / StreamApplication#output() > >>> 3. StreamApplication should also include an internal > ApplicationRunner > >>> instance (config driven, class loaded) to be able to switch between > local > >>> vs remote execution > >>> 4. Implementation of LocalApplicationRunner should directly > >>> instantiate and manage StreamProcessor instances for each job, > removing the > >>> LocalJobRunnner > >>> 5. Implementation of RemoteApplicationRunner should instantiate a > >>> remote JobFactory, create the remote job and submitted it for each > job, > >>> removing the current JobRunner interface > >>> 6. We also need a StreamTaskApplication class that allows user to > >>> create task-level applications, by mandate the constructor with a > parameter > >>> of StreamTaskFactory > >>> > >>> > >>> One more opinion around the status and the waitForFinish(): I would > think > >>> that waitForFinish() is just waiting for the local Runtime to > complete, not > >>> to wait for the remote job to be completed. > >>> > >>> I will be working on revision of SEP-2 and some example user code > example > >>> for now and will share it soon. > >>> > >>> Thanks! > >>> > >>> -Yi > >>> > >>> On Wed, May 3, 2017 at 8:08 AM, Chris Pettitt < > >>> cpett...@linkedin.com.invalid> wrote: > >>> > >>>> Hi Xinyu, > >>>> > >>>> I took a second look at the registerStore API. Would it be possible to > >>>> call > >>>> register storeDirectly on the app, similar to what we're doing with > >>>> app.input (possible with the restriction registerStore must be called > >>>> before we add an operator that uses the store)? Otherwise we'll end up > >>>> having to do two passes on the graph again - similar to the way we > had to > >>>> do a pass to init stream config and then hook up the graph. > >>>> > >>>> Thanks, > >>>> Chris > >>>> > >>>> > >>>> On Fri, Apr 28, 2017 at 8:55 PM, xinyu liu <xinyuliu...@gmail.com> > wrote: > >>>> > >>>> > Right, option #2 seems redundant for defining streams after further > >>>> > discussion here. StreamSpec itself is flexible enough to achieve > both > >>>> > static and programmatic specification of the stream. Agree it's not > >>>> > convenient for now (pretty obvious after looking at your bsr > >>>> > beam.runners.samza.wrapper), and we should provide similar > predefined > >>>> > convenient wrappers for user to create the StreamSpec. In your case > >>>> > something like BoundedStreamSpec.file(....) which will generate the > >>>> system > >>>> > and serialize the data as you did. > >>>> > > >>>> > We're still thinking the callback proposed in #2 can be useful for > >>>> > requirement #6: injecting other user objects in run time, such as > stores > >>>> > and metrics. To simplify the user understanding further, I think we > >>>> might > >>>> > hide the ApplicationRunner and expose the StreamApplication instead, > >>>> which > >>>> > will make requirement #3 not user facing. So the API becomes like: > >>>> > > >>>> > StreamApplication app = StreamApplication.local(config) > >>>> > .init (env -> { > >>>> > env.registerStore("my-store", new MyStoreFactory()); > >>>> > env.registerMetricsReporter("my-reporte", new > >>>> > MyMetricsReporterFactory()); > >>>> > }) > >>>> > .withLifeCycleListener(myListener); > >>>> > > >>>> > app.input(BoundedStreamSpec.create("/sample/input.txt")) > >>>> > .map(...) > >>>> > .window(...) > >>>> > > >>>> > app.run(); > >>>> > > >>>> > For requirement #5, I add a .withLifeCycleListener() in the API, > which > >>>> can > >>>> > trigger the callbacks with life cycle events. > >>>> > > >>>> > For #4: distribution of the jars will be what we have today using > the > >>>> Yarn > >>>> > localization with a remote store like artifactory or http server. We > >>>> > discussed where to put the graph serialization. The current > thinking is > >>>> to > >>>> > define a general interface which can backed by a remote store, like > >>>> Kafka, > >>>> > artifactory or http server. For Kafka, it's straightforward but we > will > >>>> > have the size limit or cut it by ourselves. For the other two, we > need > >>>> to > >>>> > investigate whether we can easily upload jars to our artifactory and > >>>> > localizing it with Yarn. Any opinions on this? > >>>> > > >>>> > Thanks, > >>>> > Xinyu > >>>> > > >>>> > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < > >>>> > cpett...@linkedin.com.invalid> wrote: > >>>> > > >>>> > > Your proposal for #1 looks good. > >>>> > > > >>>> > > I'm not quite how to reconcile the proposals for #1 and #2. In #1 > you > >>>> add > >>>> > > the stream spec straight onto the runner while in #2 you do it in > a > >>>> > > callback. If it is either-or, #1 looks a lot better for my > purposes. > >>>> > > > >>>> > > For #4 what mechanism are you using to distribute the JARs? Can > you > >>>> use > >>>> > the > >>>> > > same mechanism to distribute the serialized graph? > >>>> > > > >>>> > > On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu < > xinyuliu...@gmail.com> > >>>> > wrote: > >>>> > > > >>>> > > > btw, I will get to SAMZA-1246 as soon as possible. > >>>> > > > > >>>> > > > Thanks, > >>>> > > > Xinyu > >>>> > > > > >>>> > > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu < > xinyuliu...@gmail.com> > >>>> > > wrote: > >>>> > > > > >>>> > > > > Let me try to capture the updated requirements: > >>>> > > > > > >>>> > > > > 1. Set up input streams outside StreamGraph, and treat graph > >>>> building > >>>> > > as > >>>> > > > a > >>>> > > > > library (*Fluent, Beam*). > >>>> > > > > > >>>> > > > > 2. Improve ease of use for ApplicationRunner to avoid complex > >>>> > > > > configurations such as zkCoordinator, zkCoordinationService. > >>>> > > > (*Standalone*). > >>>> > > > > Provide some programmatic way to tweak them in the API. > >>>> > > > > > >>>> > > > > 3. Clean up ApplicationRunner into a single interface > (*Fluent*). > >>>> We > >>>> > > can > >>>> > > > > have one or more implementations but it's hidden from the > users. > >>>> > > > > > >>>> > > > > 4. Separate StreamGraph from runtime environment so it can be > >>>> > > serialized > >>>> > > > (*Beam, > >>>> > > > > Yarn*) > >>>> > > > > > >>>> > > > > 5. Better life cycle management of application, parity with > >>>> > > > > StreamProcessor (*Standalone, Beam*). Stats should include > >>>> exception > >>>> > in > >>>> > > > > case of failure (tracked in SAMZA-1246). > >>>> > > > > > >>>> > > > > 6. Support injecting user-defined objects into > ApplicationRunner. > >>>> > > > > > >>>> > > > > Prateek and I iterate on the ApplilcationRunner API based on > these > >>>> > > > > requirements. To support #1, we can set up input streams on > the > >>>> > runner > >>>> > > > > level, which returns the MessageStream and allows graph > building > >>>> > > > > afterwards. The code looks like below: > >>>> > > > > > >>>> > > > > ApplicationRunner runner = ApplicationRunner.local(); > >>>> > > > > runner.input(streamSpec) > >>>> > > > > .map(..) > >>>> > > > > .window(...) > >>>> > > > > runner.run(); > >>>> > > > > > >>>> > > > > StreamSpec is the building block for setting up streams here. > It > >>>> can > >>>> > be > >>>> > > > > set up in different ways: > >>>> > > > > > >>>> > > > > - Direct creation of stream spec, like runner.input(new > >>>> > > StreamSpec(id, > >>>> > > > > system, stream)) > >>>> > > > > - Load from streamId from env or config, like > >>>> > > > runner.input(runner.env(). > >>>> > > > > getStreamSpec(id)) > >>>> > > > > - Canned Spec which generates the StreamSpec with id, > system and > >>>> > > stream > >>>> > > > > to minimize the configuration. For example, > >>>> CollectionSpec.create(new > >>>> > > > > ArrayList[]{1,2,3,4}), which will auto generate the system and > >>>> stream > >>>> > > in > >>>> > > > > the spec. > >>>> > > > > > >>>> > > > > To support #2, we need to be able to set up StreamSpec-related > >>>> > objects > >>>> > > > and > >>>> > > > > factories programmatically in env. Suppose we have the > following > >>>> > before > >>>> > > > > runner.input(...): > >>>> > > > > > >>>> > > > > runner.setup(env /* a writable interface of env*/ -> { > >>>> > > > > env.setStreamSpec(streamId, streamSpec); > >>>> > > > > env.setSystem(systemName, systemFactory); > >>>> > > > > }) > >>>> > > > > > >>>> > > > > runner.setup(->) also provides setup for stores and other > runtime > >>>> > stuff > >>>> > > > > needed for the execution. The setup should be able to > serialized > >>>> to > >>>> > > > config. > >>>> > > > > For #6, I haven't figured out a good way to inject > user-defined > >>>> > objects > >>>> > > > > here yet. > >>>> > > > > > >>>> > > > > With this API, we should be able to also support #4. For > remote > >>>> > > > > runner.run(), the operator user classes/lamdas in the > StreamGraph > >>>> > need > >>>> > > to > >>>> > > > > be serialized. As today, the existing option is to serialize > to a > >>>> > > stream, > >>>> > > > > either the coordinator stream or the pipeline control stream, > >>>> which > >>>> > > will > >>>> > > > > have the size limit per message. Do you see RPC as an option? > >>>> > > > > > >>>> > > > > For this version of API, seems we don't need the > StreamApplication > >>>> > > > wrapper > >>>> > > > > as well as exposing the StreamGraph. Do you think we are on > the > >>>> right > >>>> > > > path? > >>>> > > > > > >>>> > > > > Thanks, > >>>> > > > > Xinyu > >>>> > > > > > >>>> > > > > > >>>> > > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > >>>> > > > > cpett...@linkedin.com.invalid> wrote: > >>>> > > > > > >>>> > > > >> That should have been: > >>>> > > > >> > >>>> > > > >> For #1, Beam doesn't have a hard requirement... > >>>> > > > >> > >>>> > > > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt < > >>>> > cpett...@linkedin.com > >>>> > > > > >>>> > > > >> wrote: > >>>> > > > >> > >>>> > > > >> > For #1, I doesn't have a hard requirement for any change > from > >>>> > > Samza. A > >>>> > > > >> > very nice to have would be to allow the input systems to be > >>>> set up > >>>> > > at > >>>> > > > >> the > >>>> > > > >> > same time as the rest of the StreamGraph. An even nicer to > have > >>>> > > would > >>>> > > > >> be to > >>>> > > > >> > do away with the callback based approach and treat graph > >>>> building > >>>> > > as a > >>>> > > > >> > library, a la Beam and Flink. > >>>> > > > >> > > >>>> > > > >> > For the moment I've worked around the two pass requirement > >>>> (once > >>>> > for > >>>> > > > >> > config, once for StreamGraph) by introducing an IR layer > >>>> between > >>>> > > Beam > >>>> > > > >> and > >>>> > > > >> > the Samza Fluent translation. The IR layer is convenient > >>>> > independent > >>>> > > > of > >>>> > > > >> > this problem because it makes it easier to switch between > the > >>>> > Fluent > >>>> > > > and > >>>> > > > >> > low-level APIs. > >>>> > > > >> > > >>>> > > > >> > > >>>> > > > >> > For #4, if we had parity with StreamProcessor for lifecycle > >>>> we'd > >>>> > be > >>>> > > in > >>>> > > > >> > great shape. One additional issue with the status call > that I > >>>> may > >>>> > > not > >>>> > > > >> have > >>>> > > > >> > mentioned is that it provides you no way to get at the > cause of > >>>> > > > failure. > >>>> > > > >> > The StreamProcessor API does allow this via the callback. > >>>> > > > >> > > >>>> > > > >> > > >>>> > > > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra > >>>> > > configuration > >>>> > > > >> > indirection you currently have to jump through (this is > also > >>>> > related > >>>> > > > to > >>>> > > > >> > system consumer configuration from #1. It makes it much > easier > >>>> to > >>>> > > > >> discover > >>>> > > > >> > what the configurable parameters are too, if we provide > some > >>>> > > > >> programmatic > >>>> > > > >> > way to tweak them in the API - which can turn into config > under > >>>> > the > >>>> > > > >> hood. > >>>> > > > >> > > >>>> > > > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu < > >>>> xinyuliu...@gmail.com > >>>> > > > >>>> > > > >> wrote: > >>>> > > > >> > > >>>> > > > >> >> Let me give a shot to summarize the requirements for > >>>> > > > ApplicationRunner > >>>> > > > >> we > >>>> > > > >> >> have discussed so far: > >>>> > > > >> >> > >>>> > > > >> >> - Support environment for passing in user-defined objects > >>>> > (streams > >>>> > > > >> >> potentially) into ApplicationRunner (*Beam*) > >>>> > > > >> >> > >>>> > > > >> >> - Improve ease of use for ApplicationRunner to avoid > complex > >>>> > > > >> >> configurations > >>>> > > > >> >> such as zkCoordinator, zkCoordinationService. > (*Standalone*) > >>>> > > > >> >> > >>>> > > > >> >> - Clean up ApplicationRunner into a single interface > >>>> (*Fluent*). > >>>> > We > >>>> > > > can > >>>> > > > >> >> have one or more implementations but it's hidden from the > >>>> users. > >>>> > > > >> >> > >>>> > > > >> >> - Separate StreamGraph from environment so it can be > >>>> serializable > >>>> > > > >> (*Beam, > >>>> > > > >> >> Yarn*) > >>>> > > > >> >> > >>>> > > > >> >> - Better life cycle management of application, including > >>>> > > > >> >> start/stop/stats (*Standalone, > >>>> > > > >> >> Beam*) > >>>> > > > >> >> > >>>> > > > >> >> > >>>> > > > >> >> One way to address 2 and 3 is to provide pre-packaged > runner > >>>> > using > >>>> > > > >> static > >>>> > > > >> >> factory methods, and the return type will be the > >>>> > ApplicationRunner > >>>> > > > >> >> interface. So we can have: > >>>> > > > >> >> > >>>> > > > >> >> ApplicationRunner runner = ApplicationRunner.zk() / > >>>> > > > >> >> ApplicationRunner.local() > >>>> > > > >> >> / ApplicationRunner.remote() / ApplicationRunner.test(). > >>>> > > > >> >> > >>>> > > > >> >> Internally we will package the right configs and run-time > >>>> > > environment > >>>> > > > >> with > >>>> > > > >> >> the runner. For example, ApplicationRunner.zk() will > define > >>>> all > >>>> > the > >>>> > > > >> >> configs > >>>> > > > >> >> needed for zk coordination. > >>>> > > > >> >> > >>>> > > > >> >> To support 1 and 4, can we pass in a lambda function in > the > >>>> > runner, > >>>> > > > and > >>>> > > > >> >> then we can run the stream graph? Like the following: > >>>> > > > >> >> > >>>> > > > >> >> ApplicationRunner.zk().env(config -> > >>>> > > > environment).run(streamGraph); > >>>> > > > >> >> > >>>> > > > >> >> Then we need a way to pass the environment into the > >>>> StreamGraph. > >>>> > > This > >>>> > > > >> can > >>>> > > > >> >> be done by either adding an extra parameter to each > operator, > >>>> or > >>>> > > > have a > >>>> > > > >> >> getEnv() function in the MessageStream, which seems to be > >>>> pretty > >>>> > > > hacky. > >>>> > > > >> >> > >>>> > > > >> >> What do you think? > >>>> > > > >> >> > >>>> > > > >> >> Thanks, > >>>> > > > >> >> Xinyu > >>>> > > > >> >> > >>>> > > > >> >> > >>>> > > > >> >> > >>>> > > > >> >> > >>>> > > > >> >> > >>>> > > > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < > >>>> > > > >> >> pmaheshw...@linkedin.com.invalid> wrote: > >>>> > > > >> >> > >>>> > > > >> >> > Thanks for putting this together Yi! > >>>> > > > >> >> > > >>>> > > > >> >> > I agree with Jake, it does seem like there are a few too > >>>> many > >>>> > > > moving > >>>> > > > >> >> parts > >>>> > > > >> >> > here. That said, the problem being solved is pretty > broad, > >>>> so > >>>> > let > >>>> > > > me > >>>> > > > >> >> try to > >>>> > > > >> >> > summarize my current understanding of the requirements. > >>>> Please > >>>> > > > >> correct > >>>> > > > >> >> me > >>>> > > > >> >> > if I'm wrong or missing something. > >>>> > > > >> >> > > >>>> > > > >> >> > ApplicationRunner and JobRunner first, ignoring test > >>>> > environment > >>>> > > > for > >>>> > > > >> the > >>>> > > > >> >> > moment. > >>>> > > > >> >> > ApplicationRunner: > >>>> > > > >> >> > 1. Create execution plan: Same in Standalone and Yarn > >>>> > > > >> >> > 2. Create intermediate streams: Same logic but different > >>>> leader > >>>> > > > >> election > >>>> > > > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn). > >>>> > > > >> >> > 3. Run jobs: In JVM in standalone. Submit to the > cluster in > >>>> > Yarn. > >>>> > > > >> >> > > >>>> > > > >> >> > JobRunner: > >>>> > > > >> >> > 1. Run the StreamProcessors: Same process in Standalone > & > >>>> Test. > >>>> > > > >> Remote > >>>> > > > >> >> host > >>>> > > > >> >> > in Yarn. > >>>> > > > >> >> > > >>>> > > > >> >> > To get a single ApplicationRunner implementation, like > Jake > >>>> > > > >> suggested, > >>>> > > > >> >> we > >>>> > > > >> >> > need to make leader election and JobRunner > implementation > >>>> > > > pluggable. > >>>> > > > >> >> > There's still the question of whether > ApplicationRunner#run > >>>> API > >>>> > > > >> should > >>>> > > > >> >> be > >>>> > > > >> >> > blocking or non-blocking. It has to be non-blocking in > >>>> YARN. We > >>>> > > > want > >>>> > > > >> it > >>>> > > > >> >> to > >>>> > > > >> >> > be blocking in standalone, but seems like the main > reason is > >>>> > ease > >>>> > > > of > >>>> > > > >> use > >>>> > > > >> >> > when launched from main(). I'd prefer making it > consitently > >>>> > > > >> non-blocking > >>>> > > > >> >> > instead, esp. since in embedded standalone mode (where > the > >>>> > > > processor > >>>> > > > >> is > >>>> > > > >> >> > running in another container) a blocking API would not > be > >>>> > > > >> user-friendly > >>>> > > > >> >> > either. If not, we can add both run and runBlocking. > >>>> > > > >> >> > > >>>> > > > >> >> > Coming to RuntimeEnvironment, which is the least clear > to > >>>> me so > >>>> > > > far: > >>>> > > > >> >> > 1. I don't think RuntimeEnvironment should be > responsible > >>>> for > >>>> > > > >> providing > >>>> > > > >> >> > StreamSpecs for streamIds - they can be obtained with a > >>>> > > config/util > >>>> > > > >> >> class. > >>>> > > > >> >> > The StreamProcessor should only know about logical > streamIds > >>>> > and > >>>> > > > the > >>>> > > > >> >> > streamId <-> actual stream mapping should happen within > the > >>>> > > > >> >> > SystemProducer/Consumer/Admins provided by the > >>>> > > RuntimeEnvironment. > >>>> > > > >> >> > 2. There's also other components that the user might be > >>>> > > interested > >>>> > > > in > >>>> > > > >> >> > providing implementations of in embedded Standalone mode > >>>> (i.e., > >>>> > > not > >>>> > > > >> >> just in > >>>> > > > >> >> > tests) - MetricsRegistry and JMXServer come to mind. > >>>> > > > >> >> > 3. Most importantly, it's not clear to me who creates > and > >>>> > manages > >>>> > > > the > >>>> > > > >> >> > RuntimeEnvironment. It seems like it should be the > >>>> > > > ApplicationRunner > >>>> > > > >> or > >>>> > > > >> >> the > >>>> > > > >> >> > user because of (2) above and because StreamManager also > >>>> needs > >>>> > > > >> access to > >>>> > > > >> >> > SystemAdmins for creating intermediate streams which > users > >>>> > might > >>>> > > > >> want to > >>>> > > > >> >> > mock. But it also needs to be passed down to the > >>>> > StreamProcessor > >>>> > > - > >>>> > > > >> how > >>>> > > > >> >> > would this work on Yarn? > >>>> > > > >> >> > > >>>> > > > >> >> > I think we should figure out how to integrate > >>>> > RuntimeEnvironment > >>>> > > > with > >>>> > > > >> >> > ApplicationRunner before we can make a call on one vs. > >>>> multiple > >>>> > > > >> >> > ApplicationRunner implementations. If we do keep > >>>> > > > >> LocalApplicationRunner > >>>> > > > >> >> and > >>>> > > > >> >> > RemoteApplication (and TestApplicationRunner) separate, > >>>> agree > >>>> > > with > >>>> > > > >> Jake > >>>> > > > >> >> > that we should remove the JobRunners and roll them up > into > >>>> the > >>>> > > > >> >> respective > >>>> > > > >> >> > ApplicationRunners. > >>>> > > > >> >> > > >>>> > > > >> >> > - Prateek > >>>> > > > >> >> > > >>>> > > > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes < > >>>> > > jacob.m...@gmail.com > >>>> > > > > > >>>> > > > >> >> wrote: > >>>> > > > >> >> > > >>>> > > > >> >> > > Thanks for the SEP! > >>>> > > > >> >> > > > >>>> > > > >> >> > > +1 on introducing these new components > >>>> > > > >> >> > > -1 on the current definition of their roles (see > Design > >>>> > > feedback > >>>> > > > >> >> below) > >>>> > > > >> >> > > > >>>> > > > >> >> > > *Design* > >>>> > > > >> >> > > > >>>> > > > >> >> > > - If LocalJobRunner and RemoteJobRunner handle the > >>>> > different > >>>> > > > >> >> methods > >>>> > > > >> >> > of > >>>> > > > >> >> > > launching a Job, what additional value do the > different > >>>> > > types > >>>> > > > of > >>>> > > > >> >> > > ApplicationRunner and RuntimeEnvironment provide? > It > >>>> seems > >>>> > > > like > >>>> > > > >> a > >>>> > > > >> >> red > >>>> > > > >> >> > > flag > >>>> > > > >> >> > > that all 3 would need to change from environment to > >>>> > > > >> environment. It > >>>> > > > >> >> > > indicates that they don't have proper modularity. > The > >>>> > > > >> >> > > call-sequence-figures > >>>> > > > >> >> > > support this; LocalApplicationRunner and > >>>> > > > RemoteApplicationRunner > >>>> > > > >> >> make > >>>> > > > >> >> > > the > >>>> > > > >> >> > > same calls and the diagram only varies after > >>>> > > jobRunner.start() > >>>> > > > >> >> > > - As far as I can tell, the only difference between > >>>> Local > >>>> > > and > >>>> > > > >> >> Remote > >>>> > > > >> >> > > ApplicationRunner is that one is blocking and the > >>>> other is > >>>> > > > >> >> > > non-blocking. If > >>>> > > > >> >> > > that's all they're for then either the names > should be > >>>> > > changed > >>>> > > > >> to > >>>> > > > >> >> > > reflect > >>>> > > > >> >> > > this, or they should be combined into one > >>>> > ApplicationRunner > >>>> > > > and > >>>> > > > >> >> just > >>>> > > > >> >> > > expose > >>>> > > > >> >> > > separate methods for run() and runBlocking() > >>>> > > > >> >> > > - There isn't much detail on why the main() > methods for > >>>> > > > >> >> Local/Remote > >>>> > > > >> >> > > have such different implementations, how they > receive > >>>> the > >>>> > > > >> >> Application > >>>> > > > >> >> > > (direct vs config), and concretely how the > deployment > >>>> > > scripts, > >>>> > > > >> if > >>>> > > > >> >> any, > >>>> > > > >> >> > > should interact with them. > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > *Style* > >>>> > > > >> >> > > > >>>> > > > >> >> > > - nit: None of the 11 uses of the word "actual" in > the > >>>> doc > >>>> > > are > >>>> > > > >> >> > > *actually* > >>>> > > > >> >> > > needed. :-) > >>>> > > > >> >> > > - nit: Colors of the runtime blocks in the > diagrams are > >>>> > > > >> >> unconventional > >>>> > > > >> >> > > and a little distracting. Reminds me of nai won > bao. > >>>> Now > >>>> > I'm > >>>> > > > >> >> hungry. > >>>> > > > >> >> > :-) > >>>> > > > >> >> > > - Prefer the name "ExecutionEnvironment" over > >>>> > > > >> "RuntimeEnvironment". > >>>> > > > >> >> > The > >>>> > > > >> >> > > term "execution environment" is used > >>>> > > > >> >> > > - The code comparisons for the ApplicationRunners > are > >>>> not > >>>> > > > >> >> > apples-apples. > >>>> > > > >> >> > > The local runner example is an application that > USES > >>>> the > >>>> > > local > >>>> > > > >> >> runner. > >>>> > > > >> >> > > The > >>>> > > > >> >> > > remote runner example is the just the runner code > >>>> itself. > >>>> > > So, > >>>> > > > >> it's > >>>> > > > >> >> not > >>>> > > > >> >> > > readily apparent that we're comparing the main() > >>>> methods > >>>> > and > >>>> > > > not > >>>> > > > >> >> the > >>>> > > > >> >> > > application itself. > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan < > >>>> nickpa...@gmail.com > >>>> > > > >>>> > > > >> wrote: > >>>> > > > >> >> > > > >>>> > > > >> >> > > > Made some updates to clarify the role and functions > of > >>>> > > > >> >> > RuntimeEnvironment > >>>> > > > >> >> > > > in SEP-2. > >>>> > > > >> >> > > > > >>>> > > > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan < > >>>> > nickpa...@gmail.com > >>>> > > > > >>>> > > > >> >> wrote: > >>>> > > > >> >> > > > > >>>> > > > >> >> > > > > Hi, everyone, > >>>> > > > >> >> > > > > > >>>> > > > >> >> > > > > In light of new features such as fluent API and > >>>> > standalone > >>>> > > > that > >>>> > > > >> >> > > introduce > >>>> > > > >> >> > > > > new deployment / application launch models in > Samza, I > >>>> > > > created > >>>> > > > >> a > >>>> > > > >> >> new > >>>> > > > >> >> > > > SEP-2 > >>>> > > > >> >> > > > > to address the new use cases. SEP-2 link: > >>>> > > > https://cwiki.apache > >>>> > > > >> . > >>>> > > > >> >> > > > > org/confluence/display/SAMZA/S > >>>> EP-2%3A+ApplicationRunner+ > >>>> > > > Design > >>>> > > > >> >> > > > > > >>>> > > > >> >> > > > > Please take a look and give feedbacks! > >>>> > > > >> >> > > > > > >>>> > > > >> >> > > > > Thanks! > >>>> > > > >> >> > > > > > >>>> > > > >> >> > > > > -Yi > >>>> > > > >> >> > > > > > >>>> > > > >> >> > > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > >>>> > > > >> >> > >>>> > > > >> > > >>>> > > > >> > > >>>> > > > >> > >>>> > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>> > >>> >