[
https://issues.apache.org/jira/browse/QUARKS-195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15402752#comment-15402752
]
Dale LaBossiere commented on QUARKS-195:
----------------------------------------
Yup, the desire for an initialization interface was noted here:
https://cwiki.apache.org/confluence/display/QUARKS/Experiences+with+ControlService.
However, a Functional initialization interface doesn't eliminate the need for
the oplet based fanin() to build the Barrier plumbing op.
Adding a Functional initialization interface is goodness. But in the long run
I also think there will be a need to easily add Oplets of various forms to a
topology so we should just embrace that need and design a clean way to expose
that.
e.g., move all oplet based methods out of TStream into a new OpletTStream
interface and
add "asOpletTStream()" to TStream for use like:
```
TStream<Foo> foo = ...;
foo.asOpletTStream().fanin(new SomeFanInOplet(...)).map(...);
foo.asOpletTStream().peek(new SomePeekOplet(...)).map(...);
foo.asOpletTStream().pipe(...).map(...)
foo.asOpletTStream().sink(...);
```
> Metrics.{counter,rateMeter}() shouldn't use TStream.pipe(); add
> TStream.peek(Peek)
> ----------------------------------------------------------------------------------
>
> Key: QUARKS-195
> URL: https://issues.apache.org/jira/browse/QUARKS-195
> Project: Quarks
> Issue Type: Bug
> Components: API
> Reporter: Dale LaBossiere
> Assignee: Dale LaBossiere
> Priority: Minor
>
> Add `TStream.peek(Peek<T>)` and change `Metrics.counter(TStream)` and
> `Metrics.rateMeter(TStream)` to use it instead of `pipe()`.
> The author of CounterOp and RateMeter implemented the functionality as Peek
> Oplets not as Consumer functions. Lacking a TStream.peek(Peek),
> TStream.pipe() must be used to add the Peek oplets to the topology.
> The runtime treats TStream.peek(Consumer) generated Peek oplets rather
> differently than pipe related oplets (added via pipe() or indirectly via
> pipe-ish functional methods): see Connector.connect() vs Connector.peek(),
> and TStream.peek() returns "this" whereas the addition of pipe oplets returns
> a new TStream.
> The use of pipe() in this case is partially responsible for the effect
> reported in QUARKS-189.
> Adding TStream.peek(Peek) enables users/authors of Peek oplets to get the
> same peek-ish behavior as their functional peeker brethen. It continues to
> flesh out the general ability of API clients to implement and add oplets to
> the topology.
> The growing number of "oplet" based analogs to the "function" based methods
> makes me wonder if the oplet ones should be broken out into another interface
> that TStream implements (`OpletTStream`?). It would contain the current
> `pipe(Pipe)`, `fanin(FanIn,List)`, `sink(Sink)`, and the new `peek(Peek)`,
> and any others that may be needed in the future - e.g., a `split(Split)`
> and/or one that can handle multiple iports and oports.
> Instead, TStream.pipe() (ConnectorStream.pipe()) could be modified to deal
> with Pipe oplet args in the desired manner and document that Pipe oplets
> receive this special treatment and that pipe() returns "this" for them
> instead of a new TStream. Adding TStream.peek(Peek) seems like a clearer
> alternative, and perhaps oplet.core.Peek should not extend Pipe?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)