Repository: samza
Updated Branches:
  refs/heads/master 925866d9b -> 8df1e16ee


SAMZA-1211; Remove Thread.sleep() from TestJoinOperator tests

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #123 from prateekm/join-test-no-sleep


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8df1e16e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8df1e16e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8df1e16e

Branch: refs/heads/master
Commit: 8df1e16eee38df1c3b935186ba27e328fb4a1260
Parents: 925866d
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Mon Apr 17 12:09:25 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Apr 17 12:09:25 2017 -0700

----------------------------------------------------------------------
 .../samza/operators/impl/OperatorImplGraph.java |  2 +-
 .../operators/impl/PartialJoinOperatorImpl.java | 18 +++++----
 .../samza/operators/TestJoinOperator.java       | 42 +++++++++++---------
 3 files changed, 34 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 709f2a0..8e492dc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -180,7 +180,7 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) 
operatorSpec, clock);
     } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) 
operatorSpec, source, config, context);
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) 
operatorSpec, source, config, context, clock);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));

http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index b2948a3..e4cb9c2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -29,12 +31,10 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of 
type {@code M} in this stream
  * with buffered messages of type {@code JM} in the other stream.
@@ -51,21 +51,23 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends 
OperatorImpl<M, RM> {
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
   private final int opId;
+  private final Clock clock;
 
   PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> 
partialJoinOperatorSpec, MessageStreamImpl<M> source,
-      Config config, TaskContext context) {
+      Config config, TaskContext context, Clock clock) {
     this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
     this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
     this.ttlMs = partialJoinOperatorSpec.getTtlMs();
     this.opId = partialJoinOperatorSpec.getOpId();
+    this.clock = clock;
   }
 
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
     K key = thisPartialJoinFn.getKey(message);
-    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, 
System.currentTimeMillis()));
+    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, 
clock.currentTimeMillis()));
     PartialJoinMessage<JM> otherMessage = 
otherPartialJoinFn.getState().get(key);
-    long now = System.currentTimeMillis();
+    long now = clock.currentTimeMillis();
     if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - 
ttlMs) {
       RM joinResult = thisPartialJoinFn.apply(message, 
otherMessage.getMessage());
       this.propagateResult(joinResult, collector, coordinator);
@@ -74,7 +76,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends 
OperatorImpl<M, RM> {
 
   @Override
   public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
-    long now = System.currentTimeMillis();
+    long now = clock.currentTimeMillis();
 
     KeyValueStore<K, PartialJoinMessage<M>> thisState = 
thisPartialJoinFn.getState();
     KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all();
@@ -92,7 +94,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends 
OperatorImpl<M, RM> {
     iterator.close();
     thisState.deleteAll(keysToRemove);
 
-    LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, 
System.currentTimeMillis() - now);
+    LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, 
clock.currentTimeMillis() - now);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8df1e16e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 1180179..1135726 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,6 +19,10 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -33,13 +37,11 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -51,7 +53,7 @@ public class TestJoinOperator {
 
   @Test
   public void join() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -67,7 +69,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -82,7 +84,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatch() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -96,7 +98,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatchReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -110,7 +112,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -127,7 +129,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -144,7 +146,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -166,7 +168,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -188,14 +190,15 @@ public class TestJoinOperator {
 
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    TestClock testClock = new TestClock();
+    StreamOperatorTask sot = createStreamOperatorTask(testClock);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), 
messageCollector, taskCoordinator));
 
-    Thread.sleep(100); // 10 * ttl for join
+    testClock.advanceTime(100);
     sot.window(messageCollector, taskCoordinator); // should expire first 
stream messages
 
     // push messages to second stream with same key
@@ -207,14 +210,15 @@ public class TestJoinOperator {
 
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    TestClock testClock = new TestClock();
+    StreamOperatorTask sot = createStreamOperatorTask(testClock);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), 
messageCollector, taskCoordinator));
 
-    Thread.sleep(100); // 10 * ttl for join
+    testClock.advanceTime(100); // 10 * ttl for join
     sot.window(messageCollector, taskCoordinator); // should expire second 
stream messages
 
     // push messages to first stream with same key
@@ -223,7 +227,7 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
-  private StreamOperatorTask createStreamOperatorTask() throws Exception {
+  private StreamOperatorTask createStreamOperatorTask(Clock clock) throws 
Exception {
     ApplicationRunner runner = mock(ApplicationRunner.class);
     when(runner.getStreamSpec("instream")).thenReturn(new 
StreamSpec("instream", "instream", "insystem"));
     when(runner.getStreamSpec("instream2")).thenReturn(new 
StreamSpec("instream2", "instream2", "insystem2"));
@@ -235,7 +239,7 @@ public class TestJoinOperator {
     Config config = mock(Config.class);
 
     StreamApplication sgb = new TestStreamApplication();
-    StreamOperatorTask sot = new StreamOperatorTask(sgb, runner);
+    StreamOperatorTask sot = new StreamOperatorTask(sgb, runner, clock);
     sot.init(config, taskContext);
     return sot;
   }

Reply via email to