Repository: samza Updated Branches: refs/heads/master d8534b5d8 -> e6cc3b713
SAMZA-1155; Validate users configure window.ms when using the fluent API We will compute triggering duration as follows: - If user configures `task.window.ms` we will honor it as the triggering duration - If not, we will use the `GCD(windowTriggerDurations, joinTTLs)` as the triggering duration. Changes in this PR: - Common Interface for all time based triggers - Additional APIs in `StreamGraphImpl` to recursively traverse all `OperatorSpec`s - Recursive computation of `triggerInterval` for each `WindowOperatorSpec` - Tests for all the above Author: vjagadish1989 <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Jacob Maes <[email protected]>, Xinyu Liu <[email protected]> Closes #160 from vjagadish1989/samza-1155 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6cc3b71 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6cc3b71 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6cc3b71 Branch: refs/heads/master Commit: e6cc3b713ab42488afa0e9dd111ef0b863fc5382 Parents: d8534b5 Author: vjagadish1989 <[email protected]> Authored: Fri May 5 19:21:29 2017 -0700 Committer: vjagadish1989 <[email protected]> Committed: Fri May 5 19:21:29 2017 -0700 ---------------------------------------------------------------------- .../operators/triggers/RepeatingTrigger.java | 2 +- .../operators/triggers/TimeBasedTrigger.java | 36 +++++++ .../triggers/TimeSinceFirstMessageTrigger.java | 3 +- .../triggers/TimeSinceLastMessageTrigger.java | 3 +- .../samza/operators/triggers/TimeTrigger.java | 3 +- .../org/apache/samza/execution/JobNode.java | 44 ++++++++ .../apache/samza/operators/StreamGraphImpl.java | 48 +++++++++ .../operators/spec/WindowOperatorSpec.java | 61 +++++++++++ .../apache/samza/operators/util/MathUtils.java | 50 +++++++++ .../samza/execution/TestExecutionPlanner.java | 108 +++++++++++++++++++ .../operators/spec/TestWindowOperatorSpec.java | 63 +++++++++++ .../org/apache/samza/util/TestMathUtils.java | 62 +++++++++++ 12 files changed, 479 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java index 166d0d9..2f05be8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java @@ -21,7 +21,7 @@ package org.apache.samza.operators.triggers; /** * A {@link Trigger} that repeats its underlying trigger forever. */ -class RepeatingTrigger<M> implements Trigger<M> { +public class RepeatingTrigger<M> implements Trigger<M> { private final Trigger<M> trigger; http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java new file mode 100644 index 0000000..c26f70f --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeBasedTrigger.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.annotation.InterfaceStability; + +import java.time.Duration; + +/** + * A {@link Trigger} whose firing logic is determined by a time duration. + * + * <p> Use the {@link Triggers} APIs to create a {@link Trigger}. + * + * @param <M> the type of the incoming message + */ + [email protected] +public interface TimeBasedTrigger<M> extends Trigger<M> { + Duration getDuration(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java index 94b7769..e4f4659 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java @@ -26,7 +26,7 @@ import java.time.Duration; * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in * the window pane. */ -public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> { +public class TimeSinceFirstMessageTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; @@ -35,6 +35,7 @@ public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> { this.duration = duration; } + @Override public Duration getDuration() { return duration; } http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java index 2231fd4..94cafdd 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java @@ -24,7 +24,7 @@ import java.time.Duration; * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration. * @param <M> the type of the incoming {@link MessageEnvelope} */ -public class TimeSinceLastMessageTrigger<M> implements Trigger<M> { +public class TimeSinceLastMessageTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; @@ -33,6 +33,7 @@ public class TimeSinceLastMessageTrigger<M> implements Trigger<M> { this.duration = duration; } + @Override public Duration getDuration() { return duration; } http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java index d854d74..875a0c1 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java @@ -23,7 +23,7 @@ import java.time.Duration; /* * A {@link Trigger} that fires after the specified duration in processing time. */ -public class TimeTrigger<M> implements Trigger<M> { +public class TimeTrigger<M> implements Trigger<M>, TimeBasedTrigger<M> { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; @@ -32,6 +32,7 @@ public class TimeTrigger<M> implements Trigger<M> { this.duration = duration; } + @Override public Duration getDuration() { return duration; } http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 0484cf9..fbad520 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -21,6 +21,7 @@ package org.apache.samza.execution; import com.google.common.base.Joiner; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +31,10 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.util.MathUtils; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +109,17 @@ public class JobNode { List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList()); configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); + + // set triggering interval if a window or join is defined + if (streamGraph.hasWindowOrJoins()) { + if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) { + long triggerInterval = computeTriggerInterval(); + log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName); + + configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval)); + } + } + log.info("Job {} has generated configs {}", jobName, configs); configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); @@ -114,6 +130,34 @@ public class JobNode { } /** + * Computes the triggering interval to use during the execution of this {@link JobNode} + */ + private long computeTriggerInterval() { + // Obtain the operator specs from the streamGraph + Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs(); + + // Filter out window operators, and obtain a list of their triggering interval values + List<Long> windowTimerIntervals = operatorSpecs.stream() + .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW) + .map(spec -> ((WindowOperatorSpec) spec).getDefaultTriggerMs()) + .collect(Collectors.toList()); + + // Filter out the join operators, and obtain a list of their ttl values + List<Long> joinTtlIntervals = operatorSpecs.stream() + .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN) + .map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs()) + .collect(Collectors.toList()); + + // Combine both the above lists + List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals); + candidateTimerIntervals.addAll(windowTimerIntervals); + + // Compute the gcd of the resultant list + long timerInterval = MathUtils.gcd(candidateTimerIntervals); + return timerInterval; + } + + /** * This function extract the subset of configs from the full config, and use it to override the generated configs * from the job. * @param fullConfig full config http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 5ba390a..31a75ce 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -20,6 +20,7 @@ package org.apache.samza.operators; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.stream.InputStreamInternal; import org.apache.samza.operators.stream.InputStreamInternalImpl; import org.apache.samza.operators.stream.IntermediateStreamInternalImpl; @@ -28,11 +29,15 @@ import org.apache.samza.operators.stream.OutputStreamInternalImpl; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.Collectors; /** * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to @@ -156,4 +161,47 @@ public class StreamGraphImpl implements StreamGraph { /* package private */ int getNextOpId() { return this.opId++; } + + /** + * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl} + * + * @return a set of all available {@link OperatorSpec}s + */ + public Collection<OperatorSpec> getAllOperatorSpecs() { + Collection<InputStreamInternal> inputStreams = inStreams.values(); + Set<OperatorSpec> operatorSpecs = new HashSet<>(); + + for (InputStreamInternal stream : inputStreams) { + doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs); + } + return operatorSpecs; + } + + private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> specs) { + Collection<OperatorSpec> registeredOperatorSpecs = stream.getRegisteredOperatorSpecs(); + for (OperatorSpec spec : registeredOperatorSpecs) { + specs.add(spec); + MessageStreamImpl nextStream = spec.getNextStream(); + if (nextStream != null) { + //Recursively traverse and obtain all reachable operators + doGetOperatorSpecs(nextStream, specs); + } + } + } + + /** + * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator + * + * @return <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator + */ + public boolean hasWindowOrJoins() { + // Obtain the operator specs from the streamGraph + Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs(); + + Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream() + .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN) + .collect(Collectors.toSet()); + + return windowOrJoinSpecs.size() != 0; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 7ea07f6..3c2be0a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -20,9 +20,20 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.triggers.AnyTrigger; +import org.apache.samza.operators.triggers.RepeatingTrigger; +import org.apache.samza.operators.triggers.TimeBasedTrigger; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.util.MathUtils; import org.apache.samza.operators.util.OperatorJsonUtils; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; /** @@ -34,6 +45,7 @@ import org.apache.samza.operators.windows.internal.WindowInternal; */ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> { + private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorSpec.class); private final WindowInternal<M, WK, WV> window; private final MessageStreamImpl<WindowPane<WK, WV>> nextStream; private final int opId; @@ -76,4 +88,53 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK public String getSourceLocation() { return sourceLocation; } + + /** + * Get the default triggering interval for this {@link WindowOperatorSpec} + * + * This is defined as the GCD of all triggering intervals across all {@link TimeBasedTrigger}s configured for + * this {@link WindowOperatorSpec}. + * + * @return the default triggering interval + */ + public long getDefaultTriggerMs() { + List<TimeBasedTrigger> timerTriggers = new ArrayList<>(); + + if (window.getDefaultTrigger() != null) { + timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger())); + } + if (window.getEarlyTrigger() != null) { + timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger())); + } + if (window.getLateTrigger() != null) { + timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger())); + } + + LOG.info("Got {} timer triggers", timerTriggers.size()); + + List<Long> candidateDurations = timerTriggers.stream() + .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis()) + .collect(Collectors.toList()); + + return MathUtils.gcd(candidateDurations); + } + + private List<TimeBasedTrigger> getTimeBasedTriggers(Trigger rootTrigger) { + List<TimeBasedTrigger> timeBasedTriggers = new ArrayList<>(); + // traverse all triggers in the graph starting at the root trigger + if (rootTrigger instanceof TimeBasedTrigger) { + timeBasedTriggers.add((TimeBasedTrigger) rootTrigger); + } else if (rootTrigger instanceof RepeatingTrigger) { + // recurse on the underlying trigger + timeBasedTriggers.addAll(getTimeBasedTriggers(((RepeatingTrigger) rootTrigger).getTrigger())); + } else if (rootTrigger instanceof AnyTrigger) { + List<Trigger> subTriggers = ((AnyTrigger) rootTrigger).getTriggers(); + + for (Trigger subTrigger: subTriggers) { + // recurse on each sub-trigger + timeBasedTriggers.addAll(getTimeBasedTriggers(subTrigger)); + } + } + return timeBasedTriggers; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java new file mode 100644 index 0000000..bccc3b3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.util; + +import java.util.List; + +public class MathUtils { + + public static long gcd(long a, long b) { + // use the euclid gcd algorithm + while (b > 0) { + long temp = b; + b = a % b; + a = temp; + } + return a; + } + + public static long gcd(List<Long> numbers) { + if (numbers == null) { + throw new IllegalArgumentException("Null list provided"); + } + if (numbers.size() == 0) { + throw new IllegalArgumentException("List of size 0 provided"); + } + + long result = numbers.get(0); + for (int i = 1; i < numbers.size(); i++) { + result = gcd(result, numbers.get(i)); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 5366dc3..daa223a 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -23,10 +23,12 @@ import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; @@ -40,12 +42,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -155,6 +159,32 @@ public class TestExecutionPlanner { return streamGraph; } + private StreamGraphImpl createStreamGraphWithJoinAndWindow() { + + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + BiFunction msgBuilder = mock(BiFunction.class); + MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); + MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + Function mockFn = mock(Function.class); + OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); + OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + + m1.map(m -> m) + .filter(m->true) + .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8))); + + m2.map(m -> m) + .filter(m->true) + .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16))); + + m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1); + m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2); + m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(252)).sendTo(output2); + + return streamGraph; + } + @Before public void setup() { Map<String, String> configMap = new HashMap<>(); @@ -264,6 +294,84 @@ public class TestExecutionPlanner { } @Test + public void testTriggerIntervalForJoins() throws Exception { + Map<String, String> map = new HashMap<>(config); + map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); + StreamGraphImpl streamGraph = createStreamGraphWithJoin(); + ExecutionPlan plan = planner.plan(streamGraph); + List<JobConfig> jobConfigs = plan.getJobConfigs(); + for (JobConfig config : jobConfigs) { + System.out.println(config); + } + } + + @Test + public void testTriggerIntervalForWindowsAndJoins() throws Exception { + Map<String, String> map = new HashMap<>(config); + map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); + StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); + ExecutionPlan plan = planner.plan(streamGraph); + List<JobConfig> jobConfigs = plan.getJobConfigs(); + assertEquals(jobConfigs.size(), 1); + + // GCD of 8, 16, 1600 and 252 is 4 + assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4"); + } + + @Test + public void testTriggerIntervalWithInvalidWindowMs() throws Exception { + Map<String, String> map = new HashMap<>(config); + map.put(TaskConfig.WINDOW_MS(), "-1"); + map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); + StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); + ExecutionPlan plan = planner.plan(streamGraph); + List<JobConfig> jobConfigs = plan.getJobConfigs(); + assertEquals(jobConfigs.size(), 1); + + // GCD of 8, 16, 1600 and 252 is 4 + assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4"); + } + + + @Test + public void testTriggerIntervalForStatelessOperators() throws Exception { + Map<String, String> map = new HashMap<>(config); + map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); + StreamGraphImpl streamGraph = createSimpleGraph(); + ExecutionPlan plan = planner.plan(streamGraph); + List<JobConfig> jobConfigs = plan.getJobConfigs(); + assertEquals(jobConfigs.size(), 1); + assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS())); + } + + @Test + public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception { + Map<String, String> map = new HashMap<>(config); + map.put(TaskConfig.WINDOW_MS(), "2000"); + map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); + StreamGraphImpl streamGraph = createSimpleGraph(); + ExecutionPlan plan = planner.plan(streamGraph); + List<JobConfig> jobConfigs = plan.getJobConfigs(); + assertEquals(jobConfigs.size(), 1); + assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "2000"); + } + + @Test public void testCalculateIntStreamPartitions() throws Exception { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamGraphImpl streamGraph = createSimpleGraph(); http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java new file mode 100644 index 0000000..affe37f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class TestWindowOperatorSpec { + @Test + public void testTriggerIntervalWithNestedTimeTriggers() { + Trigger defaultTrigger = Triggers.timeSinceFirstMessage(Duration.ofMillis(150)); + Trigger lateTrigger = Triggers.any(Triggers.count(6), Triggers.timeSinceFirstMessage(Duration.ofMillis(15))); + Trigger earlyTrigger = Triggers.repeat( + Triggers.any(Triggers.count(23), + Triggers.timeSinceFirstMessage(Duration.ofMillis(15)), + Triggers.any(Triggers.any(Triggers.count(6), + Triggers.timeSinceFirstMessage(Duration.ofMillis(15)), Triggers.timeSinceFirstMessage(Duration.ofMillis(25)), + Triggers.timeSinceLastMessage(Duration.ofMillis(15)))))); + + WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION); + window.setEarlyTrigger(earlyTrigger); + window.setLateTrigger(lateTrigger); + + WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0); + Assert.assertEquals(spec.getDefaultTriggerMs(), 5); + } + + @Test + public void testTriggerIntervalWithSingleTimeTrigger() { + Trigger defaultTrigger = Triggers.timeSinceFirstMessage(Duration.ofMillis(150)); + Trigger earlyTrigger = Triggers.repeat(Triggers.count(5)); + + WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION); + window.setEarlyTrigger(earlyTrigger); + + WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0); + Assert.assertEquals(spec.getDefaultTriggerMs(), 150); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e6cc3b71/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java new file mode 100644 index 0000000..46e0735 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import com.google.common.collect.ImmutableList; +import junit.framework.Assert; +import org.apache.samza.operators.util.MathUtils; +import org.junit.Test; + +import java.util.Collections; + +public class TestMathUtils { + + @Test(expected = IllegalArgumentException.class) + public void testGcdWithNullInputs() { + MathUtils.gcd(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testGcdWithEmptyInputs() { + MathUtils.gcd(Collections.emptyList()); + } + + @Test + public void testGcdWithValidInputs() { + // gcd(x, x) = x + Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 2L))); + Assert.assertEquals(15, MathUtils.gcd(ImmutableList.of(15L))); + Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(1L))); + + // gcd(0,x) = x + Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 0L))); + + // gcd(1,x) = 1 + Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L))); + + // other happy path test cases + Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L))); + Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L))); + Assert.assertEquals(5, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L))); + + Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L))); + } + +}
