Github user wmarshall484 commented on a diff in the pull request:
https://github.com/apache/incubator-quarks/pull/60#discussion_r58492726
--- Diff:
spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java ---
@@ -163,6 +175,100 @@ protected Graph graph() {
}
@Override
+ public <J, U, K> TStream<J> join(Function<T, K> keyer,
+ TWindow<U, K> twindow, BiFunction<T, List<U>, J> joiner) {
+
+ if(twindow instanceof TWindowImpl){
+ TStream<U> lastStream = twindow.feeder();
+ BiFunction<List<U>,K, Object> processor =
Functions.synchronizedBiFunction((list, key) -> null);
+ Window<U, K, LinkedList<U>> window =
Windows.lastNProcessOnInsert(((TWindowImpl<U, K>)twindow).getSize(),
twindow.getKeyFunction());
+ Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window,
processor);
+ lastStream.pipe(op);
+ return this.map((tuple) -> {
+ Partition<U, K, ? extends List<U>> part =
window.getPartitions().get(keyer.apply(tuple));
+ if(part == null)
+ return null;
+ J ret;
+ synchronized (part) {
+ List<U> last = part.getContents();
+ ret = joiner.apply(tuple, last);
+ }
+ return ret;
+ });
+ }
+
+ else if (twindow instanceof TWindowTimeImpl){
+ TStream<U> lastStream = twindow.feeder();
+ BiFunction<List<U>,K, Object> processor =
Functions.synchronizedBiFunction((list, key) -> null);
+ long time = ((TWindowTimeImpl<U, K>)(twindow)).getTime();
+ TimeUnit unit = ((TWindowTimeImpl<U, K>)(twindow)).getUnit();
+ Window<U, K, InsertionTimeList<U>> window =
+ Windows.window(
+ alwaysInsert(),
+ scheduleEvictIfEmpty(time, unit),
+ evictOlderWithProcess(time, unit),
+ processOnInsert(),
+ twindow.getKeyFunction(),
+ insertionTimeList());
+ Aggregate<U,Object,K> op = new Aggregate<U,Object,K>(window,
processor);
--- End diff --
The Aggregate isn't unconnected. With the next line
```java
lastStream.pipe(op);
```
its state is set to update based on the tuples from `twindow.feeder`
stream. The `window` object which was created earlier is included in the
`joiner` function via closure so its partitions can be accessed.
Including the window in the `joiner` function through a closure removes the
need to create an additional oplet with two input ports. The downside is that
this approach only works if the Quarks application is only running inside of a
single JVM, which it is for now (but maybe not later?)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---