[
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081680#comment-15081680
]
ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------
Github user gauravgopi123 commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/185#discussion_r48773375
--- Diff:
engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
@@ -364,29 +413,49 @@ else if (!doCheckpoint) {
if (tracker.ports[trackerIndex] == null) {
tracker.ports[trackerIndex++] = activePort;
break;
- }
- else if (tracker.ports[trackerIndex] == activePort) {
+ } else if (tracker.ports[trackerIndex] == activePort) {
break;
}
trackerIndex++;
}
- if (trackerIndex == totalQueues) {
- trackerIterator = resetTupleTracker.iterator();
+ if (trackerIndex == regularQueues) {
+ Iterator<TupleTracker> trackerIterator =
resetTupleTracker.iterator();
while (trackerIterator.hasNext()) {
if (trackerIterator.next().tuple.getBaseSeconds() <=
baseSeconds) {
trackerIterator.remove();
}
}
- for (int s = sinks.length; s-- > 0; ) {
- sinks[s].put(t);
+ if (!delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(t);
+ }
+ controlTupleCount++;
}
- controlTupleCount++;
-
- assert (activeQueues.isEmpty());
- activeQueues.addAll(inputs.values());
+ if (!activeQueues.isEmpty()) {
+ // make sure they are all queues from DelayOperator
+ for (Map.Entry<String, SweepableReservoir> entry :
activeQueues) {
+ if
(!isInputPortConnectedToDelayOperator(entry.getKey())) {
+ assert (false);
+ }
+ }
+ activeQueues.clear();
+ }
+ activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
+
+ if (firstWindowId == -1) {
+ if (delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(t);
+ }
+ // if it's a DelayOperator and this is the first
RESET_WINDOW (start) or END_STREAM
--- End diff --
@davidyan74 : Do we need to send reset_tuple downstream? I don't we need
it. If yes then do we not need to do controlTupleCount++;?
> Iterative processing support
> ----------------------------
>
> Key: APEXCORE-60
> URL: https://issues.apache.org/jira/browse/APEXCORE-60
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: David Yan
> Assignee: David Yan
> Labels: roadmap
> Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
> |----|
> v |
> A -> B -> C -> D
> ^ |
> |---------|
> {noformat}
> C has two separate backward streams to A and B. The input ports of A and B
> that C connects to will have a special attribute on how many window IDs ahead
> the incoming windows should be treated as, and A and B will be responsible
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have
> C generate the initial data on its output ports to A and B.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)