Repository: samza
Updated Branches:
  refs/heads/master d8534b5d8 -> e6cc3b713


SAMZA-1155; Validate users configure window.ms when using the fluent API

We will compute triggering duration as follows:
- If user configures `task.window.ms` we will honor it as the triggering 
duration
- If not, we will use the `GCD(windowTriggerDurations, joinTTLs)` as the 
triggering duration.

Changes in this PR:
- Common Interface for all time based triggers
- Additional APIs in `StreamGraphImpl` to recursively traverse all 
`OperatorSpec`s
- Recursive computation of `triggerInterval` for each `WindowOperatorSpec`
- Tests for all the above

Author: vjagadish1989 <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>, Jacob Maes 
<[email protected]>, Xinyu Liu <[email protected]>

Closes #160 from vjagadish1989/samza-1155


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6cc3b71
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6cc3b71
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6cc3b71

Branch: refs/heads/master
Commit: e6cc3b713ab42488afa0e9dd111ef0b863fc5382
Parents: d8534b5
Author: vjagadish1989 <[email protected]>
Authored: Fri May 5 19:21:29 2017 -0700
Committer: vjagadish1989 <[email protected]>
Committed: Fri May 5 19:21:29 2017 -0700

----------------------------------------------------------------------
 .../operators/triggers/RepeatingTrigger.java    |   2 +-
 .../operators/triggers/TimeBasedTrigger.java    |  36 +++++++
 .../triggers/TimeSinceFirstMessageTrigger.java  |   3 +-
 .../triggers/TimeSinceLastMessageTrigger.java   |   3 +-
 .../samza/operators/triggers/TimeTrigger.java   |   3 +-
 .../org/apache/samza/execution/JobNode.java     |  44 ++++++++
 .../apache/samza/operators/StreamGraphImpl.java |  48 +++++++++
 .../operators/spec/WindowOperatorSpec.java      |  61 +++++++++++
 .../apache/samza/operators/util/MathUtils.java  |  50 +++++++++
 .../samza/execution/TestExecutionPlanner.java   | 108 +++++++++++++++++++
 .../operators/spec/TestWindowOperatorSpec.java  |  63 +++++++++++
 .../org/apache/samza/util/TestMathUtils.java    |  62 +++++++++++
 12 files changed, 479 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index 166d0d9..2f05be8 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -21,7 +21,7 @@ package org.apache.samza.operators.triggers;
 /**
  * A {@link Trigger} that repeats its underlying trigger forever.
  */
-class RepeatingTrigger<M> implements Trigger<M> {
+public class RepeatingTrigger<M> implements Trigger<M> {
 
   private final Trigger<M> trigger;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java
new file mode 100644
index 0000000..c26f70f
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+import java.time.Duration;
+
+/**
+ * A {@link Trigger} whose firing logic is determined by a time duration.
+ *
+ * <p> Use the {@link Triggers} APIs to create a {@link Trigger}.
+ *
+ * @param <M> the type of the incoming message
+ */
+
[email protected]
+public interface TimeBasedTrigger<M> extends Trigger<M> {
+  Duration getDuration();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 94b7769..e4f4659 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -26,7 +26,7 @@ import java.time.Duration;
  * A {@link Trigger} that fires after the specified duration has passed since 
the first {@link MessageEnvelope} in
  * the window pane.
  */
-public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger<M>, 
TimeBasedTrigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
@@ -35,6 +35,7 @@ public class TimeSinceFirstMessageTrigger<M> implements 
Trigger<M> {
     this.duration = duration;
   }
 
+  @Override
   public Duration getDuration() {
     return duration;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 2231fd4..94cafdd 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -24,7 +24,7 @@ import java.time.Duration;
  * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s 
in the window pane for the specified duration.
  * @param <M> the type of the incoming {@link MessageEnvelope}
  */
-public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {
+public class TimeSinceLastMessageTrigger<M> implements Trigger<M>, 
TimeBasedTrigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
@@ -33,6 +33,7 @@ public class TimeSinceLastMessageTrigger<M> implements 
Trigger<M> {
     this.duration = duration;
   }
 
+  @Override
   public Duration getDuration() {
     return duration;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index d854d74..875a0c1 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -23,7 +23,7 @@ import java.time.Duration;
 /*
  * A {@link Trigger} that fires after the specified duration in processing 
time.
  */
-public class TimeTrigger<M> implements Trigger<M> {
+public class TimeTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> {
 
   private final Duration duration;
   private final DurationCharacteristic characteristic = 
DurationCharacteristic.PROCESSING_TIME;
@@ -32,6 +32,7 @@ public class TimeTrigger<M> implements Trigger<M> {
     this.duration = duration;
   }
 
+  @Override
   public Duration getDuration() {
     return duration;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 0484cf9..fbad520 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -21,6 +21,7 @@ package org.apache.samza.execution;
 
 import com.google.common.base.Joiner;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,10 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +109,17 @@ public class JobNode {
 
     List<String> inputs = inEdges.stream().map(edge -> 
edge.getFormattedSystemStream()).collect(Collectors.toList());
     configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+
+    // set triggering interval if a window or join is defined
+    if (streamGraph.hasWindowOrJoins()) {
+      if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
+        long triggerInterval = computeTriggerInterval();
+        log.info("Using triggering interval: {} for jobName: {}", 
triggerInterval, jobName);
+
+        configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
+      }
+    }
+
     log.info("Job {} has generated configs {}", jobName, configs);
 
     configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
@@ -114,6 +130,34 @@ public class JobNode {
   }
 
   /**
+   * Computes the triggering interval to use during the execution of this 
{@link JobNode}
+   */
+  private long computeTriggerInterval() {
+    // Obtain the operator specs from the streamGraph
+    Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs();
+
+    // Filter out window operators, and obtain a list of their triggering 
interval values
+    List<Long> windowTimerIntervals = operatorSpecs.stream()
+        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW)
+        .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs())
+        .collect(Collectors.toList());
+
+    // Filter out the join operators, and obtain a list of their ttl values
+    List<Long> joinTtlIntervals = operatorSpecs.stream()
+        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+        .map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs())
+        .collect(Collectors.toList());
+
+    // Combine both the above lists
+    List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
+    candidateTimerIntervals.addAll(windowTimerIntervals);
+
+    // Compute the gcd of the resultant list
+    long timerInterval = MathUtils.gcd(candidateTimerIntervals);
+    return timerInterval;
+  }
+
+  /**
    * This function extract the subset of configs from the full config, and use 
it to override the generated configs
    * from the job.
    * @param fullConfig full config

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 5ba390a..31a75ce 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.operators.stream.InputStreamInternalImpl;
 import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
@@ -28,11 +29,15 @@ import 
org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A {@link StreamGraph} that provides APIs for accessing {@link 
MessageStream}s to be used to
@@ -156,4 +161,47 @@ public class StreamGraphImpl implements StreamGraph {
   /* package private */ int getNextOpId() {
     return this.opId++;
   }
+
+  /**
+   * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl}
+   *
+   * @return  a set of all available {@link OperatorSpec}s
+   */
+  public Collection<OperatorSpec> getAllOperatorSpecs() {
+    Collection<InputStreamInternal> inputStreams = inStreams.values();
+    Set<OperatorSpec> operatorSpecs = new HashSet<>();
+
+    for (InputStreamInternal stream : inputStreams) {
+      doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs);
+    }
+    return operatorSpecs;
+  }
+
+  private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> 
specs) {
+    Collection<OperatorSpec> registeredOperatorSpecs = 
stream.getRegisteredOperatorSpecs();
+    for (OperatorSpec spec : registeredOperatorSpecs) {
+      specs.add(spec);
+      MessageStreamImpl nextStream = spec.getNextStream();
+      if (nextStream != null) {
+        //Recursively traverse and obtain all reachable operators
+        doGetOperatorSpecs(nextStream, specs);
+      }
+    }
+  }
+
+  /**
+   * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or 
a window operator
+   *
+   * @return  <tt>true</tt> iff this {@link StreamGraphImpl} contains a join 
or a window operator
+   */
+  public boolean hasWindowOrJoins() {
+    // Obtain the operator specs from the streamGraph
+    Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs();
+
+    Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream()
+        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || 
spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+        .collect(Collectors.toSet());
+
+    return windowOrJoinSpecs.size() != 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 7ea07f6..3c2be0a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,9 +20,20 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.triggers.AnyTrigger;
+import org.apache.samza.operators.triggers.RepeatingTrigger;
+import org.apache.samza.operators.triggers.TimeBasedTrigger;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.operators.util.OperatorJsonUtils;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
 
 
 /**
@@ -34,6 +45,7 @@ import 
org.apache.samza.operators.windows.internal.WindowInternal;
  */
 public class WindowOperatorSpec<M, WK, WV> implements 
OperatorSpec<WindowPane<WK, WV>> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperatorSpec.class);
   private final WindowInternal<M, WK, WV> window;
   private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
   private final int opId;
@@ -76,4 +88,53 @@ public class WindowOperatorSpec<M, WK, WV> implements 
OperatorSpec<WindowPane<WK
   public String getSourceLocation() {
     return sourceLocation;
   }
+
+  /**
+   * Get the default triggering interval for this {@link WindowOperatorSpec}
+   *
+   * This is defined as the GCD of all triggering intervals across all {@link 
TimeBasedTrigger}s configured for
+   * this {@link WindowOperatorSpec}.
+   *
+   * @return the default triggering interval
+   */
+  public long getDefaultTriggerMs() {
+    List<TimeBasedTrigger> timerTriggers = new ArrayList<>();
+
+    if (window.getDefaultTrigger() != null) {
+      timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
+    }
+    if (window.getEarlyTrigger() != null) {
+      timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
+    }
+    if (window.getLateTrigger() != null) {
+      timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
+    }
+
+    LOG.info("Got {} timer triggers", timerTriggers.size());
+
+    List<Long> candidateDurations = timerTriggers.stream()
+        .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
+        .collect(Collectors.toList());
+
+    return MathUtils.gcd(candidateDurations);
+  }
+
+  private List<TimeBasedTrigger> getTimeBasedTriggers(Trigger rootTrigger) {
+    List<TimeBasedTrigger> timeBasedTriggers = new ArrayList<>();
+    // traverse all triggers in the graph starting at the root trigger
+    if (rootTrigger instanceof TimeBasedTrigger) {
+      timeBasedTriggers.add((TimeBasedTrigger) rootTrigger);
+    } else if (rootTrigger instanceof RepeatingTrigger) {
+      // recurse on the underlying trigger
+      timeBasedTriggers.addAll(getTimeBasedTriggers(((RepeatingTrigger) 
rootTrigger).getTrigger()));
+    } else if (rootTrigger instanceof AnyTrigger) {
+      List<Trigger> subTriggers = ((AnyTrigger) rootTrigger).getTriggers();
+
+      for (Trigger subTrigger: subTriggers) {
+        // recurse on each sub-trigger
+        timeBasedTriggers.addAll(getTimeBasedTriggers(subTrigger));
+      }
+    }
+    return timeBasedTriggers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java 
b/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java
new file mode 100644
index 0000000..bccc3b3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.util;
+
+import java.util.List;
+
+public class MathUtils {
+
+  public static long gcd(long a, long b) {
+    // use the euclid gcd algorithm
+    while (b > 0) {
+      long temp = b;
+      b = a % b;
+      a = temp;
+    }
+    return a;
+  }
+
+  public static long gcd(List<Long> numbers) {
+    if (numbers == null) {
+      throw new IllegalArgumentException("Null list provided");
+    }
+    if (numbers.size() == 0) {
+      throw new IllegalArgumentException("List of size 0 provided");
+    }
+
+    long result = numbers.get(0);
+    for (int i = 1; i < numbers.size(); i++) {
+      result = gcd(result, numbers.get(i));
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 5366dc3..daa223a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -23,10 +23,12 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
@@ -40,12 +42,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -155,6 +159,32 @@ public class TestExecutionPlanner {
     return streamGraph;
   }
 
+  private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    BiFunction msgBuilder = mock(BiFunction.class);
+    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m 
-> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", 
msgBuilder).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", 
msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    Function mockFn = mock(Function.class);
+    OutputStream<Object, Object, Object> output1 = 
streamGraph.getOutputStream("output1", mockFn, mockFn);
+    OutputStream<Object, Object, Object> output2 = 
streamGraph.getOutputStream("output2", mockFn, mockFn);
+
+    m1.map(m -> m)
+        .filter(m->true)
+        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, 
Duration.ofMillis(8)));
+
+    m2.map(m -> m)
+        .filter(m->true)
+        .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, 
Duration.ofMillis(16)));
+
+    m1.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(1600)).sendTo(output1);
+    m3.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(100)).sendTo(output2);
+    m3.join(m2, mock(JoinFunction.class), 
Duration.ofMillis(252)).sendTo(output2);
+
+    return streamGraph;
+  }
+
   @Before
   public void setup() {
     Map<String, String> configMap = new HashMap<>();
@@ -264,6 +294,84 @@ public class TestExecutionPlanner {
   }
 
   @Test
+  public void testTriggerIntervalForJoins() throws Exception {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
+    ExecutionPlan plan = planner.plan(streamGraph);
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    for (JobConfig config : jobConfigs) {
+      System.out.println(config);
+    }
+  }
+
+  @Test
+  public void testTriggerIntervalForWindowsAndJoins() throws Exception {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
+    ExecutionPlan plan = planner.plan(streamGraph);
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    assertEquals(jobConfigs.size(), 1);
+
+    // GCD of 8, 16, 1600 and 252 is 4
+    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+  }
+
+  @Test
+  public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(TaskConfig.WINDOW_MS(), "-1");
+    map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
+    ExecutionPlan plan = planner.plan(streamGraph);
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    assertEquals(jobConfigs.size(), 1);
+
+    // GCD of 8, 16, 1600 and 252 is 4
+    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+  }
+
+
+  @Test
+  public void testTriggerIntervalForStatelessOperators() throws Exception {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamGraphImpl streamGraph = createSimpleGraph();
+    ExecutionPlan plan = planner.plan(streamGraph);
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    assertEquals(jobConfigs.size(), 1);
+    assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
+  }
+
+  @Test
+  public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(TaskConfig.WINDOW_MS(), "2000");
+    map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamGraphImpl streamGraph = createSimpleGraph();
+    ExecutionPlan plan = planner.plan(streamGraph);
+    List<JobConfig> jobConfigs = plan.getJobConfigs();
+    assertEquals(jobConfigs.size(), 1);
+    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "2000");
+  }
+
+  @Test
   public void testCalculateIntStreamPartitions() throws Exception {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraphImpl streamGraph = createSimpleGraph();

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
new file mode 100644
index 0000000..affe37f
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+
+public class TestWindowOperatorSpec {
+  @Test
+  public void testTriggerIntervalWithNestedTimeTriggers() {
+    Trigger defaultTrigger = 
Triggers.timeSinceFirstMessage(Duration.ofMillis(150));
+    Trigger lateTrigger = Triggers.any(Triggers.count(6), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(15)));
+    Trigger earlyTrigger = Triggers.repeat(
+        Triggers.any(Triggers.count(23),
+            Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
+            Triggers.any(Triggers.any(Triggers.count(6),
+                Triggers.timeSinceFirstMessage(Duration.ofMillis(15)), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
+                Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));
+
+    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null, null, WindowType.SESSION);
+    window.setEarlyTrigger(earlyTrigger);
+    window.setLateTrigger(lateTrigger);
+
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, new 
MessageStreamImpl(null), 0);
+    Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
+  }
+
+  @Test
+  public void testTriggerIntervalWithSingleTimeTrigger() {
+    Trigger defaultTrigger = 
Triggers.timeSinceFirstMessage(Duration.ofMillis(150));
+    Trigger earlyTrigger = Triggers.repeat(Triggers.count(5));
+
+    WindowInternal window = new WindowInternal(defaultTrigger, null, null, 
null, null, WindowType.SESSION);
+    window.setEarlyTrigger(earlyTrigger);
+
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, new 
MessageStreamImpl(null), 0);
+    Assert.assertEquals(spec.getDefaultTriggerMs(), 150);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java 
b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
new file mode 100644
index 0000000..46e0735
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.google.common.collect.ImmutableList;
+import junit.framework.Assert;
+import org.apache.samza.operators.util.MathUtils;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class TestMathUtils {
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGcdWithNullInputs() {
+    MathUtils.gcd(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGcdWithEmptyInputs() {
+    MathUtils.gcd(Collections.emptyList());
+  }
+
+  @Test
+  public void testGcdWithValidInputs() {
+    // gcd(x, x) = x
+    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 2L)));
+    Assert.assertEquals(15, MathUtils.gcd(ImmutableList.of(15L)));
+    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(1L)));
+
+    // gcd(0,x) = x
+    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 0L)));
+
+    // gcd(1,x) = 1
+    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 
50L, 1L)));
+
+    // other happy path test cases
+    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 
50L, 0L)));
+    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 
50L)));
+    Assert.assertEquals(5, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 
55L)));
+
+    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 
13L)));
+  }
+
+}

Reply via email to