Repository: samza Updated Branches: refs/heads/master 925866d9b -> 8df1e16ee
SAMZA-1211; Remove Thread.sleep() from TestJoinOperator tests Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #123 from prateekm/join-test-no-sleep Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8df1e16e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8df1e16e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8df1e16e Branch: refs/heads/master Commit: 8df1e16eee38df1c3b935186ba27e328fb4a1260 Parents: 925866d Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Mon Apr 17 12:09:25 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Apr 17 12:09:25 2017 -0700 ---------------------------------------------------------------------- .../samza/operators/impl/OperatorImplGraph.java | 2 +- .../operators/impl/PartialJoinOperatorImpl.java | 18 +++++---- .../samza/operators/TestJoinOperator.java | 42 +++++++++++--------- 3 files changed, 34 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 709f2a0..8e492dc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -180,7 +180,7 @@ public class OperatorImplGraph { } else if (operatorSpec instanceof WindowOperatorSpec) { return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { - return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock); } throw new IllegalArgumentException( String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index b2948a3..e4cb9c2 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -18,6 +18,8 @@ */ package org.apache.samza.operators.impl; +import java.util.ArrayList; +import java.util.List; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.PartialJoinFunction; @@ -29,12 +31,10 @@ import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - /** * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream * with buffered messages of type {@code JM} in the other stream. @@ -51,21 +51,23 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn; private final long ttlMs; private final int opId; + private final Clock clock; PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source, - Config config, TaskContext context) { + Config config, TaskContext context, Clock clock) { this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn(); this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn(); this.ttlMs = partialJoinOperatorSpec.getTtlMs(); this.opId = partialJoinOperatorSpec.getOpId(); + this.clock = clock; } @Override public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { K key = thisPartialJoinFn.getKey(message); - thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, System.currentTimeMillis())); + thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis())); PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key); - long now = System.currentTimeMillis(); + long now = clock.currentTimeMillis(); if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) { RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage()); this.propagateResult(joinResult, collector, coordinator); @@ -74,7 +76,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { @Override public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - long now = System.currentTimeMillis(); + long now = clock.currentTimeMillis(); KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState(); KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all(); @@ -92,7 +94,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { iterator.close(); thisState.deleteAll(keysToRemove); - LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now); + LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now); } } http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 1180179..1135726 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -19,6 +19,10 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; @@ -33,13 +37,11 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.TestClock; +import org.apache.samza.util.Clock; +import org.apache.samza.util.SystemClock; import org.junit.Test; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -51,7 +53,7 @@ public class TestJoinOperator { @Test public void join() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -67,7 +69,7 @@ public class TestJoinOperator { @Test public void joinReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -82,7 +84,7 @@ public class TestJoinOperator { @Test public void joinNoMatch() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -96,7 +98,7 @@ public class TestJoinOperator { @Test public void joinNoMatchReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -110,7 +112,7 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKey() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -127,7 +129,7 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKeyReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -144,7 +146,7 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessages() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -166,7 +168,7 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessagesReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock()); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -188,14 +190,15 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessages() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + TestClock testClock = new TestClock(); + StreamOperatorTask sot = createStreamOperatorTask(testClock); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); - Thread.sleep(100); // 10 * ttl for join + testClock.advanceTime(100); sot.window(messageCollector, taskCoordinator); // should expire first stream messages // push messages to second stream with same key @@ -207,14 +210,15 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessagesReverse() throws Exception { - StreamOperatorTask sot = createStreamOperatorTask(); + TestClock testClock = new TestClock(); + StreamOperatorTask sot = createStreamOperatorTask(testClock); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); // push messages to second stream numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator)); - Thread.sleep(100); // 10 * ttl for join + testClock.advanceTime(100); // 10 * ttl for join sot.window(messageCollector, taskCoordinator); // should expire second stream messages // push messages to first stream with same key @@ -223,7 +227,7 @@ public class TestJoinOperator { assertTrue(output.isEmpty()); } - private StreamOperatorTask createStreamOperatorTask() throws Exception { + private StreamOperatorTask createStreamOperatorTask(Clock clock) throws Exception { ApplicationRunner runner = mock(ApplicationRunner.class); when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem")); when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2")); @@ -235,7 +239,7 @@ public class TestJoinOperator { Config config = mock(Config.class); StreamApplication sgb = new TestStreamApplication(); - StreamOperatorTask sot = new StreamOperatorTask(sgb, runner); + StreamOperatorTask sot = new StreamOperatorTask(sgb, runner, clock); sot.init(config, taskContext); return sot; }