Repository: samza
Updated Branches:
  refs/heads/master 03410b80c -> 10607f0a6


http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index 0a2214b..41973b2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.FoldLeftFunction;
@@ -90,8 +90,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsInitializer() {
-    class TimedSupplierFunction implements SupplierFunction<Collection>, 
TimerFunction<Object, Collection> {
+  public void testIllegalScheduledFunctionAsInitializer() {
+    class TimedSupplierFunction implements SupplierFunction<Collection>, 
ScheduledFunction<Object, Collection> {
 
       @Override
       public Collection get() {
@@ -99,12 +99,12 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Collection> onTimer(Object key, long timestamp) {
+      public Collection<Collection> onCallback(Object key, long timestamp) {
         return null;
       }
     }
@@ -138,8 +138,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsKeyFn() {
-    class TimerMapFunction implements MapFunction<Object, Object>, 
TimerFunction<Object, Object> {
+  public void testIllegalScheduledFunctionAsKeyFn() {
+    class ScheduledMapFunction implements MapFunction<Object, Object>, 
ScheduledFunction<Object, Object> {
 
       @Override
       public Object apply(Object message) {
@@ -147,16 +147,16 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Object> onTimer(Object key, long timestamp) {
+      public Collection<Object> onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    keyFn = new TimerMapFunction();
+    keyFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -186,8 +186,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsEventTimeFn() {
-    class TimerMapFunction implements MapFunction<Object, Long>, 
TimerFunction<Object, Object> {
+  public void testIllegalScheduledFunctionAsEventTimeFn() {
+    class ScheduledMapFunction implements MapFunction<Object, Long>, 
ScheduledFunction<Object, Object> {
 
       @Override
       public Long apply(Object message) {
@@ -195,16 +195,16 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Object> onTimer(Object key, long timestamp) {
+      public Collection<Object> onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    timeFn = new TimerMapFunction();
+    timeFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -234,8 +234,9 @@ public class TestWindowOperatorSpec {
   }
 
   @Test
-  public void testTimerFunctionAsFoldLeftFn() {
-    class TimerFoldLeftFunction implements FoldLeftFunction<Object, 
Collection>, TimerFunction<Object, Collection> {
+  public void testScheduledFunctionAsFoldLeftFn() {
+    class ScheduledFoldLeftFunction
+        implements FoldLeftFunction<Object, Collection>, 
ScheduledFunction<Object, Collection> {
 
       @Override
       public Collection apply(Object message, Collection oldValue) {
@@ -244,19 +245,19 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Collection> onTimer(Object key, long timestamp) {
+      public Collection<Collection> onCallback(Object key, long timestamp) {
         return null;
       }
     }
 
-    foldFn = new TimerFoldLeftFunction();
+    foldFn = new ScheduledFoldLeftFunction();
     WindowOperatorSpec<Object, Object, Collection> windowSpec = 
getWindowOperatorSpec("w0");
-    assertEquals(windowSpec.getTimerFn(), foldFn);
+    assertEquals(windowSpec.getScheduledFn(), foldFn);
     assertNull(windowSpec.getWatermarkFn());
   }
 
@@ -284,7 +285,7 @@ public class TestWindowOperatorSpec {
     foldFn = new WatermarkFoldLeftFunction();
     WindowOperatorSpec<Object, Object, Collection> windowSpec = 
getWindowOperatorSpec("w0");
     assertEquals(windowSpec.getWatermarkFn(), foldFn);
-    assertNull(windowSpec.getTimerFn());
+    assertNull(windowSpec.getScheduledFn());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java 
b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
new file mode 100644
index 0000000..e0da2e9
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestEpochTimeScheduler {
+
+  private ScheduledExecutorService createExecutorService() {
+    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
+    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> {
+        Object[] args = invocation.getArguments();
+        Runnable runnable = (Runnable) args[0];
+        runnable.run();
+        return mock(ScheduledFuture.class);
+      });
+    return service;
+  }
+
+  private void fireTimers(EpochTimeScheduler factory) {
+    factory.removeReadyTimers().entrySet().forEach(entry -> {
+        entry.getValue().onCallback(entry.getKey().getKey(), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
+      });
+  }
+
+  @Test
+  public void testSingleTimer() {
+    EpochTimeScheduler scheduler = 
EpochTimeScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> {
+        results.add(key);
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 1);
+    assertEquals(results.get(0), "single-timer");
+  }
+
+  @Test
+  public void testMultipleTimers() {
+    EpochTimeScheduler scheduler = 
EpochTimeScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> 
{
+        results.add(key + ":3");
+      });
+    scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> 
{
+        results.add(key + ":2");
+      });
+    scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> 
{
+        results.add(key + ":1");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 3);
+    assertEquals(results.get(0), "multiple-timer-1:1");
+    assertEquals(results.get(1), "multiple-timer-2:2");
+    assertEquals(results.get(2), "multiple-timer-3:3");
+  }
+
+  @Test
+  public void testMultipleKeys() {
+    Object key1 = new Object();
+    Object key2 = new Object();
+    List<String> results = new ArrayList<>();
+
+    EpochTimeScheduler scheduler = 
EpochTimeScheduler.create(createExecutorService());
+    scheduler.setTimer(key1, 2, (key, collector, coordinator) -> {
+        assertEquals(key, key1);
+        results.add("key1:2");
+      });
+    scheduler.setTimer(key2, 1, (key, collector, coordinator) -> {
+        assertEquals(key, key2);
+        results.add("key2:1");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 2);
+    assertEquals(results.get(0), "key2:1");
+    assertEquals(results.get(1), "key1:2");
+  }
+
+  @Test
+  public void testMultipleKeyTypes() {
+    String key1 = "key";
+    Long key2 = Long.MAX_VALUE;
+    List<String> results = new ArrayList<>();
+
+    EpochTimeScheduler scheduler = 
EpochTimeScheduler.create(createExecutorService());
+    scheduler.setTimer(key1, 1, (key, collector, coordinator) -> {
+        assertEquals(key, key1);
+        results.add("key:1");
+      });
+    scheduler.setTimer(key2, 2, (key, collector, coordinator) -> {
+        assertEquals(key.longValue(), Long.MAX_VALUE);
+        results.add(Long.MAX_VALUE + ":2");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 2);
+    assertEquals(results.get(0), key1 + ":1");
+    assertEquals(results.get(1), key2 + ":2");
+  }
+
+  @Test
+  public void testRemoveTimer() {
+    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
+    ScheduledFuture future = mock(ScheduledFuture.class);
+    when(future.cancel(anyBoolean())).thenReturn(true);
+    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> future);
+
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(service);
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("timer", 1, (key, collector, coordinator) -> {
+        results.add(key);
+      });
+
+    scheduler.deleteTimer("timer");
+
+    fireTimers(scheduler);
+
+    assertTrue(results.isEmpty());
+    verify(future, times(1)).cancel(anyBoolean());
+  }
+
+  @Test
+  public void testTimerListener() {
+    EpochTimeScheduler scheduler = 
EpochTimeScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.registerListener(() -> {
+        results.add("timer-listener");
+      });
+
+    scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> {
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java 
b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
deleted file mode 100644
index dd08121..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class TestSystemTimerScheduler {
-
-  private ScheduledExecutorService createExecutorService() {
-    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
-    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> {
-        Object[] args = invocation.getArguments();
-        Runnable runnable = (Runnable) args[0];
-        runnable.run();
-        return mock(ScheduledFuture.class);
-      });
-    return service;
-  }
-
-  private void fireTimers(SystemTimerScheduler factory) {
-    factory.removeReadyTimers().entrySet().forEach(entry -> {
-        entry.getValue().onTimer(entry.getKey().getKey(), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
-      });
-  }
-
-  @Test
-  public void testSingleTimer() {
-    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
-    List<String> results = new ArrayList<>();
-    scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> {
-        results.add(key);
-      });
-
-    fireTimers(scheduler);
-
-    assertTrue(results.size() == 1);
-    assertEquals(results.get(0), "single-timer");
-  }
-
-  @Test
-  public void testMultipleTimers() {
-    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
-    List<String> results = new ArrayList<>();
-    scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> 
{
-        results.add(key + ":3");
-      });
-    scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> 
{
-        results.add(key + ":2");
-      });
-    scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> 
{
-        results.add(key + ":1");
-      });
-
-    fireTimers(scheduler);
-
-    assertTrue(results.size() == 3);
-    assertEquals(results.get(0), "multiple-timer-1:1");
-    assertEquals(results.get(1), "multiple-timer-2:2");
-    assertEquals(results.get(2), "multiple-timer-3:3");
-  }
-
-  @Test
-  public void testMultipleKeys() {
-    Object key1 = new Object();
-    Object key2 = new Object();
-    List<String> results = new ArrayList<>();
-
-    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
-    scheduler.setTimer(key1, 2, (key, collector, coordinator) -> {
-        assertEquals(key, key1);
-        results.add("key1:2");
-      });
-    scheduler.setTimer(key2, 1, (key, collector, coordinator) -> {
-        assertEquals(key, key2);
-        results.add("key2:1");
-      });
-
-    fireTimers(scheduler);
-
-    assertTrue(results.size() == 2);
-    assertEquals(results.get(0), "key2:1");
-    assertEquals(results.get(1), "key1:2");
-  }
-
-  @Test
-  public void testMultipleKeyTypes() {
-    String key1 = "key";
-    Long key2 = Long.MAX_VALUE;
-    List<String> results = new ArrayList<>();
-
-    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
-    scheduler.setTimer(key1, 1, (key, collector, coordinator) -> {
-        assertEquals(key, key1);
-        results.add("key:1");
-      });
-    scheduler.setTimer(key2, 2, (key, collector, coordinator) -> {
-        assertEquals(key.longValue(), Long.MAX_VALUE);
-        results.add(Long.MAX_VALUE + ":2");
-      });
-
-    fireTimers(scheduler);
-
-    assertTrue(results.size() == 2);
-    assertEquals(results.get(0), key1 + ":1");
-    assertEquals(results.get(1), key2 + ":2");
-  }
-
-  @Test
-  public void testRemoveTimer() {
-    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
-    ScheduledFuture future = mock(ScheduledFuture.class);
-    when(future.cancel(anyBoolean())).thenReturn(true);
-    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> future);
-
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(service);
-    List<String> results = new ArrayList<>();
-    scheduler.setTimer("timer", 1, (key, collector, coordinator) -> {
-        results.add(key);
-      });
-
-    scheduler.deleteTimer("timer");
-
-    fireTimers(scheduler);
-
-    assertTrue(results.isEmpty());
-    verify(future, times(1)).cancel(anyBoolean());
-  }
-
-  @Test
-  public void testTimerListener() {
-    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
-    List<String> results = new ArrayList<>();
-    scheduler.registerListener(() -> {
-        results.add("timer-listener");
-      });
-
-    scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> {
-      });
-
-    fireTimers(scheduler);
-
-    assertTrue(results.size() == 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 1acfc47..3046c1f 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -39,7 +39,7 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
@@ -192,7 +192,7 @@ public class TestProjectTranslator extends 
TranslatorTestBase {
       }
 
       @Override
-      public TimerFunction getTimerFn() {
+      public ScheduledFunction getScheduledFn() {
         return null;
       }
     };

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
new file mode 100644
index 0000000..658492a
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.samza.test.framework.TestSchedulingApp.*;
+
+public class SchedulingTest extends StreamApplicationIntegrationTestHarness {
+
+  @Before
+  public void setup() {
+    // create topics
+    createTopic(PAGE_VIEWS, 2);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+  }
+
+  @Test
+  public void testJob() throws InterruptedException {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put("job.systemstreampartition.grouper.factory", 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
+    configs.put("task.name.grouper.factory", 
"org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+
+    runApplication(new TestSchedulingApp(), "SchedulingTest", configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
new file mode 100644
index 0000000..db78e8c
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.test.operator.data.PageView;
+
+public class TestSchedulingApp implements StreamApplication {
+  public static final String PAGE_VIEWS = "page-views";
+
+  @Override
+  public void describe(StreamApplicationDescriptor appDesc) {
+    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
+    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, 
serde);
+    final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
+    final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapScheduledFn());
+
+    MessageStreamAssert.that("Output from scheduling function should container 
all complete messages", output, serde)
+        .containsInAnyOrder(
+            Arrays.asList(
+                new PageView("v1-complete", "p1", "u1"),
+                new PageView("v2-complete", "p2", "u1"),
+                new PageView("v3-complete", "p1", "u2"),
+                new PageView("v4-complete", "p3", "u2")
+            ));
+  }
+
+  private static class FlatmapScheduledFn
+      implements FlatMapFunction<PageView, PageView>, 
ScheduledFunction<String, PageView> {
+
+    private transient List<PageView> pageViews;
+    private transient Scheduler<String> scheduler;
+
+    @Override
+    public void schedule(Scheduler<String> scheduler) {
+      this.scheduler = scheduler;
+      this.pageViews = new ArrayList<>();
+    }
+
+    @Override
+    public Collection<PageView> apply(PageView message) {
+      final PageView pv = new PageView(message.getViewId() + "-complete", 
message.getPageId(), message.getUserId());
+      pageViews.add(pv);
+
+      if (pageViews.size() == 2) {
+        //got all messages for this task
+        final long time = System.currentTimeMillis() + 100;
+        scheduler.schedule("CompleteScheduler", time);
+      }
+      return Collections.emptyList();
+    }
+
+    @Override
+    public Collection<PageView> onCallback(String key, long time) {
+      return pageViews;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
deleted file mode 100644
index e72a965..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
-import org.apache.samza.test.operator.data.PageView;
-
-public class TestTimerApp implements StreamApplication {
-  public static final String PAGE_VIEWS = "page-views";
-
-  @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
-    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
-    KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
-    KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, 
serde);
-    final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
-    final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
-
-    MessageStreamAssert.that("Output from timer function should container all 
complete messages", output, serde)
-        .containsInAnyOrder(
-            Arrays.asList(
-                new PageView("v1-complete", "p1", "u1"),
-                new PageView("v2-complete", "p2", "u1"),
-                new PageView("v3-complete", "p1", "u2"),
-                new PageView("v4-complete", "p3", "u2")
-            ));
-  }
-
-  private static class FlatmapTimerFn implements FlatMapFunction<PageView, 
PageView>, TimerFunction<String, PageView> {
-
-    private transient List<PageView> pageViews;
-    private transient TimerRegistry<String> timerRegistry;
-
-    @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
-      this.timerRegistry = timerRegistry;
-      this.pageViews = new ArrayList<>();
-    }
-
-    @Override
-    public Collection<PageView> apply(PageView message) {
-      final PageView pv = new PageView(message.getViewId() + "-complete", 
message.getPageId(), message.getUserId());
-      pageViews.add(pv);
-
-      if (pageViews.size() == 2) {
-        //got all messages for this task
-        final long time = System.currentTimeMillis() + 100;
-        timerRegistry.register("CompleteTimer", time);
-      }
-      return Collections.emptyList();
-    }
-
-    @Override
-    public Collection<PageView> onTimer(String key, long time) {
-      return pageViews;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
deleted file mode 100644
index d4e0e14..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.samza.test.framework.TestTimerApp.*;
-
-public class TimerTest extends StreamApplicationIntegrationTestHarness {
-
-  @Before
-  public void setup() {
-    // create topics
-    createTopic(PAGE_VIEWS, 2);
-
-    // create events for the following user activity.
-    // userId: (viewId, pageId, (adIds))
-    // u1: (v1, p1, (a1)), (v2, p2, (a3))
-    // u2: (v3, p1, (a1)), (v4, p3, (a5))
-    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
-    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
-    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
-
-  }
-
-  @Test
-  public void testJob() throws InterruptedException {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put("job.systemstreampartition.grouper.factory", 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
-    configs.put("task.name.grouper.factory", 
"org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
-    configs.put(JobConfig.PROCESSOR_ID(), "0");
-
-    runApplication(new TestTimerApp(), "TimerTest", configs);
-  }
-}

Reply via email to