[
https://issues.apache.org/jira/browse/APEXCORE-60?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15081628#comment-15081628
]
ASF GitHub Bot commented on APEXCORE-60:
----------------------------------------
Github user gauravgopi123 commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/185#discussion_r48768935
--- Diff:
engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java ---
@@ -212,30 +225,60 @@ public final void run()
long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
final boolean handleIdleTime = operator instanceof IdleTimeHandler;
int totalQueues = inputs.size();
+ int regularQueues = totalQueues;
+ // regularQueues is the number of queues that are not connected to a
DelayOperator
+ for (String portName : inputs.keySet()) {
+ if (isInputPortConnectedToDelayOperator(portName)) {
+ regularQueues--;
+ }
+ }
- ArrayList<SweepableReservoir> activeQueues = new
ArrayList<SweepableReservoir>();
- activeQueues.addAll(inputs.values());
+ ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new
ArrayList<>();
+ activeQueues.addAll(inputs.entrySet());
int expectingBeginWindow = activeQueues.size();
int receivedEndWindow = 0;
+ long firstWindowId = -1;
TupleTracker tracker;
LinkedList<TupleTracker> resetTupleTracker = new
LinkedList<TupleTracker>();
-
try {
do {
- Iterator<SweepableReservoir> buffers = activeQueues.iterator();
+ Iterator<Map.Entry<String, SweepableReservoir>> buffers =
activeQueues.iterator();
activequeue:
while (buffers.hasNext()) {
- SweepableReservoir activePort = buffers.next();
+ Map.Entry<String, SweepableReservoir> activePortEntry =
buffers.next();
+ SweepableReservoir activePort = activePortEntry.getValue();
Tuple t = activePort.sweep();
+ boolean needResetWindow = false;
if (t != null) {
+ boolean delay = (operator instanceof Operator.DelayOperator);
+ long windowAhead = 0;
+ if (delay) {
+ windowAhead =
WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis,
windowWidthMillis, 1);
+ if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead)
> t.getBaseSeconds()) {
--- End diff --
Why is this `if` condition needed here? needResetWindow is only used in
BEGIN_WINDOW tuple type (line 271). Have this `if` condition there itself
> Iterative processing support
> ----------------------------
>
> Key: APEXCORE-60
> URL: https://issues.apache.org/jira/browse/APEXCORE-60
> Project: Apache Apex Core
> Issue Type: New Feature
> Reporter: David Yan
> Assignee: David Yan
> Labels: roadmap
> Fix For: 3.3.0
>
>
> We would like to support iterative processing by introducing cycles in the
> graph (known as DAG now, but no longer if we support iterative processing).
> Initial idea is as follow:
> {noformat}
> |----|
> v |
> A -> B -> C -> D
> ^ |
> |---------|
> {noformat}
> C has two separate backward streams to A and B. The input ports of A and B
> that C connects to will have a special attribute on how many window IDs ahead
> the incoming windows should be treated as, and A and B will be responsible
> for the initial data for such input ports.
> Another idea is to have C advance the window ID on its output ports and have
> C generate the initial data on its output ports to A and B.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)