Repository: samza Updated Branches: refs/heads/master 03410b80c -> 10607f0a6
http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java index 0a2214b..41973b2 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java @@ -19,8 +19,8 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.Scheduler; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.FoldLeftFunction; @@ -90,8 +90,8 @@ public class TestWindowOperatorSpec { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsInitializer() { - class TimedSupplierFunction implements SupplierFunction<Collection>, TimerFunction<Object, Collection> { + public void testIllegalScheduledFunctionAsInitializer() { + class TimedSupplierFunction implements SupplierFunction<Collection>, ScheduledFunction<Object, Collection> { @Override public Collection get() { @@ -99,12 +99,12 @@ public class TestWindowOperatorSpec { } @Override - public void registerTimer(TimerRegistry<Object> timerRegistry) { + public void schedule(Scheduler<Object> scheduler) { } @Override - public Collection<Collection> onTimer(Object key, long timestamp) { + public Collection<Collection> onCallback(Object key, long timestamp) { return null; } } @@ -138,8 +138,8 @@ public class TestWindowOperatorSpec { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsKeyFn() { - class TimerMapFunction implements MapFunction<Object, Object>, TimerFunction<Object, Object> { + public void testIllegalScheduledFunctionAsKeyFn() { + class ScheduledMapFunction implements MapFunction<Object, Object>, ScheduledFunction<Object, Object> { @Override public Object apply(Object message) { @@ -147,16 +147,16 @@ public class TestWindowOperatorSpec { } @Override - public void registerTimer(TimerRegistry<Object> timerRegistry) { + public void schedule(Scheduler<Object> scheduler) { } @Override - public Collection<Object> onTimer(Object key, long timestamp) { + public Collection<Object> onCallback(Object key, long timestamp) { return null; } } - keyFn = new TimerMapFunction(); + keyFn = new ScheduledMapFunction(); getWindowOperatorSpec("w0"); } @@ -186,8 +186,8 @@ public class TestWindowOperatorSpec { } @Test(expected = IllegalArgumentException.class) - public void testIllegalTimerFunctionAsEventTimeFn() { - class TimerMapFunction implements MapFunction<Object, Long>, TimerFunction<Object, Object> { + public void testIllegalScheduledFunctionAsEventTimeFn() { + class ScheduledMapFunction implements MapFunction<Object, Long>, ScheduledFunction<Object, Object> { @Override public Long apply(Object message) { @@ -195,16 +195,16 @@ public class TestWindowOperatorSpec { } @Override - public void registerTimer(TimerRegistry<Object> timerRegistry) { + public void schedule(Scheduler<Object> scheduler) { } @Override - public Collection<Object> onTimer(Object key, long timestamp) { + public Collection<Object> onCallback(Object key, long timestamp) { return null; } } - timeFn = new TimerMapFunction(); + timeFn = new ScheduledMapFunction(); getWindowOperatorSpec("w0"); } @@ -234,8 +234,9 @@ public class TestWindowOperatorSpec { } @Test - public void testTimerFunctionAsFoldLeftFn() { - class TimerFoldLeftFunction implements FoldLeftFunction<Object, Collection>, TimerFunction<Object, Collection> { + public void testScheduledFunctionAsFoldLeftFn() { + class ScheduledFoldLeftFunction + implements FoldLeftFunction<Object, Collection>, ScheduledFunction<Object, Collection> { @Override public Collection apply(Object message, Collection oldValue) { @@ -244,19 +245,19 @@ public class TestWindowOperatorSpec { } @Override - public void registerTimer(TimerRegistry<Object> timerRegistry) { + public void schedule(Scheduler<Object> scheduler) { } @Override - public Collection<Collection> onTimer(Object key, long timestamp) { + public Collection<Collection> onCallback(Object key, long timestamp) { return null; } } - foldFn = new TimerFoldLeftFunction(); + foldFn = new ScheduledFoldLeftFunction(); WindowOperatorSpec<Object, Object, Collection> windowSpec = getWindowOperatorSpec("w0"); - assertEquals(windowSpec.getTimerFn(), foldFn); + assertEquals(windowSpec.getScheduledFn(), foldFn); assertNull(windowSpec.getWatermarkFn()); } @@ -284,7 +285,7 @@ public class TestWindowOperatorSpec { foldFn = new WatermarkFoldLeftFunction(); WindowOperatorSpec<Object, Object, Collection> windowSpec = getWindowOperatorSpec("w0"); assertEquals(windowSpec.getWatermarkFn(), foldFn); - assertNull(windowSpec.getTimerFn()); + assertNull(windowSpec.getScheduledFn()); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java new file mode 100644 index 0000000..e0da2e9 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestEpochTimeScheduler { + + private ScheduledExecutorService createExecutorService() { + ScheduledExecutorService service = mock(ScheduledExecutorService.class); + when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return mock(ScheduledFuture.class); + }); + return service; + } + + private void fireTimers(EpochTimeScheduler factory) { + factory.removeReadyTimers().entrySet().forEach(entry -> { + entry.getValue().onCallback(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); + }); + } + + @Test + public void testSingleTimer() { + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> { + results.add(key); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 1); + assertEquals(results.get(0), "single-timer"); + } + + @Test + public void testMultipleTimers() { + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> { + results.add(key + ":3"); + }); + scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> { + results.add(key + ":2"); + }); + scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> { + results.add(key + ":1"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 3); + assertEquals(results.get(0), "multiple-timer-1:1"); + assertEquals(results.get(1), "multiple-timer-2:2"); + assertEquals(results.get(2), "multiple-timer-3:3"); + } + + @Test + public void testMultipleKeys() { + Object key1 = new Object(); + Object key2 = new Object(); + List<String> results = new ArrayList<>(); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); + scheduler.setTimer(key1, 2, (key, collector, coordinator) -> { + assertEquals(key, key1); + results.add("key1:2"); + }); + scheduler.setTimer(key2, 1, (key, collector, coordinator) -> { + assertEquals(key, key2); + results.add("key2:1"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 2); + assertEquals(results.get(0), "key2:1"); + assertEquals(results.get(1), "key1:2"); + } + + @Test + public void testMultipleKeyTypes() { + String key1 = "key"; + Long key2 = Long.MAX_VALUE; + List<String> results = new ArrayList<>(); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); + scheduler.setTimer(key1, 1, (key, collector, coordinator) -> { + assertEquals(key, key1); + results.add("key:1"); + }); + scheduler.setTimer(key2, 2, (key, collector, coordinator) -> { + assertEquals(key.longValue(), Long.MAX_VALUE); + results.add(Long.MAX_VALUE + ":2"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 2); + assertEquals(results.get(0), key1 + ":1"); + assertEquals(results.get(1), key2 + ":2"); + } + + @Test + public void testRemoveTimer() { + ScheduledExecutorService service = mock(ScheduledExecutorService.class); + ScheduledFuture future = mock(ScheduledFuture.class); + when(future.cancel(anyBoolean())).thenReturn(true); + when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future); + + EpochTimeScheduler scheduler = EpochTimeScheduler.create(service); + List<String> results = new ArrayList<>(); + scheduler.setTimer("timer", 1, (key, collector, coordinator) -> { + results.add(key); + }); + + scheduler.deleteTimer("timer"); + + fireTimers(scheduler); + + assertTrue(results.isEmpty()); + verify(future, times(1)).cancel(anyBoolean()); + } + + @Test + public void testTimerListener() { + EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.registerListener(() -> { + results.add("timer-listener"); + }); + + scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> { + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 1); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java deleted file mode 100644 index dd08121..0000000 --- a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class TestSystemTimerScheduler { - - private ScheduledExecutorService createExecutorService() { - ScheduledExecutorService service = mock(ScheduledExecutorService.class); - when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> { - Object[] args = invocation.getArguments(); - Runnable runnable = (Runnable) args[0]; - runnable.run(); - return mock(ScheduledFuture.class); - }); - return service; - } - - private void fireTimers(SystemTimerScheduler factory) { - factory.removeReadyTimers().entrySet().forEach(entry -> { - entry.getValue().onTimer(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); - }); - } - - @Test - public void testSingleTimer() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); - List<String> results = new ArrayList<>(); - scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> { - results.add(key); - }); - - fireTimers(scheduler); - - assertTrue(results.size() == 1); - assertEquals(results.get(0), "single-timer"); - } - - @Test - public void testMultipleTimers() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); - List<String> results = new ArrayList<>(); - scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> { - results.add(key + ":3"); - }); - scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> { - results.add(key + ":2"); - }); - scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> { - results.add(key + ":1"); - }); - - fireTimers(scheduler); - - assertTrue(results.size() == 3); - assertEquals(results.get(0), "multiple-timer-1:1"); - assertEquals(results.get(1), "multiple-timer-2:2"); - assertEquals(results.get(2), "multiple-timer-3:3"); - } - - @Test - public void testMultipleKeys() { - Object key1 = new Object(); - Object key2 = new Object(); - List<String> results = new ArrayList<>(); - - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); - scheduler.setTimer(key1, 2, (key, collector, coordinator) -> { - assertEquals(key, key1); - results.add("key1:2"); - }); - scheduler.setTimer(key2, 1, (key, collector, coordinator) -> { - assertEquals(key, key2); - results.add("key2:1"); - }); - - fireTimers(scheduler); - - assertTrue(results.size() == 2); - assertEquals(results.get(0), "key2:1"); - assertEquals(results.get(1), "key1:2"); - } - - @Test - public void testMultipleKeyTypes() { - String key1 = "key"; - Long key2 = Long.MAX_VALUE; - List<String> results = new ArrayList<>(); - - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); - scheduler.setTimer(key1, 1, (key, collector, coordinator) -> { - assertEquals(key, key1); - results.add("key:1"); - }); - scheduler.setTimer(key2, 2, (key, collector, coordinator) -> { - assertEquals(key.longValue(), Long.MAX_VALUE); - results.add(Long.MAX_VALUE + ":2"); - }); - - fireTimers(scheduler); - - assertTrue(results.size() == 2); - assertEquals(results.get(0), key1 + ":1"); - assertEquals(results.get(1), key2 + ":2"); - } - - @Test - public void testRemoveTimer() { - ScheduledExecutorService service = mock(ScheduledExecutorService.class); - ScheduledFuture future = mock(ScheduledFuture.class); - when(future.cancel(anyBoolean())).thenReturn(true); - when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future); - - SystemTimerScheduler scheduler = SystemTimerScheduler.create(service); - List<String> results = new ArrayList<>(); - scheduler.setTimer("timer", 1, (key, collector, coordinator) -> { - results.add(key); - }); - - scheduler.deleteTimer("timer"); - - fireTimers(scheduler); - - assertTrue(results.isEmpty()); - verify(future, times(1)).cancel(anyBoolean()); - } - - @Test - public void testTimerListener() { - SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); - List<String> results = new ArrayList<>(); - scheduler.registerListener(() -> { - results.add("timer-listener"); - }); - - scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> { - }); - - fireTimers(scheduler); - - assertTrue(results.size() == 1); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java index 1acfc47..3046c1f 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java @@ -39,7 +39,7 @@ import org.apache.samza.container.TaskName; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.ScheduledFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; @@ -192,7 +192,7 @@ public class TestProjectTranslator extends TranslatorTestBase { } @Override - public TimerFunction getTimerFn() { + public ScheduledFunction getScheduledFn() { return null; } }; http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java new file mode 100644 index 0000000..658492a --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.samza.test.framework.TestSchedulingApp.*; + +public class SchedulingTest extends StreamApplicationIntegrationTestHarness { + + @Before + public void setup() { + // create topics + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); + produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); + + } + + @Test + public void testJob() throws InterruptedException { + Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); + configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + + runApplication(new TestSchedulingApp(), "SchedulingTest", configs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java new file mode 100644 index 0000000..db78e8c --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.operators.Scheduler; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.ScheduledFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.test.operator.data.PageView; + +public class TestSchedulingApp implements StreamApplication { + public static final String PAGE_VIEWS = "page-views"; + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); + KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); + final MessageStream<PageView> pageViews = appDesc.getInputStream(isd); + final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn()); + + MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde) + .containsInAnyOrder( + Arrays.asList( + new PageView("v1-complete", "p1", "u1"), + new PageView("v2-complete", "p2", "u1"), + new PageView("v3-complete", "p1", "u2"), + new PageView("v4-complete", "p3", "u2") + )); + } + + private static class FlatmapScheduledFn + implements FlatMapFunction<PageView, PageView>, ScheduledFunction<String, PageView> { + + private transient List<PageView> pageViews; + private transient Scheduler<String> scheduler; + + @Override + public void schedule(Scheduler<String> scheduler) { + this.scheduler = scheduler; + this.pageViews = new ArrayList<>(); + } + + @Override + public Collection<PageView> apply(PageView message) { + final PageView pv = new PageView(message.getViewId() + "-complete", message.getPageId(), message.getUserId()); + pageViews.add(pv); + + if (pageViews.size() == 2) { + //got all messages for this task + final long time = System.currentTimeMillis() + 100; + scheduler.schedule("CompleteScheduler", time); + } + return Collections.emptyList(); + } + + @Override + public Collection<PageView> onCallback(String key, long time) { + return pageViews; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java deleted file mode 100644 index e72a965..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.framework; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.StreamApplicationDescriptor; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.TimerRegistry; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.system.kafka.KafkaInputDescriptor; -import org.apache.samza.system.kafka.KafkaSystemDescriptor; -import org.apache.samza.test.operator.data.PageView; - -public class TestTimerApp implements StreamApplication { - public static final String PAGE_VIEWS = "page-views"; - - @Override - public void describe(StreamApplicationDescriptor appDesc) { - final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); - KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka"); - KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde); - final MessageStream<PageView> pageViews = appDesc.getInputStream(isd); - final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); - - MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde) - .containsInAnyOrder( - Arrays.asList( - new PageView("v1-complete", "p1", "u1"), - new PageView("v2-complete", "p2", "u1"), - new PageView("v3-complete", "p1", "u2"), - new PageView("v4-complete", "p3", "u2") - )); - } - - private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> { - - private transient List<PageView> pageViews; - private transient TimerRegistry<String> timerRegistry; - - @Override - public void registerTimer(TimerRegistry<String> timerRegistry) { - this.timerRegistry = timerRegistry; - this.pageViews = new ArrayList<>(); - } - - @Override - public Collection<PageView> apply(PageView message) { - final PageView pv = new PageView(message.getViewId() + "-complete", message.getPageId(), message.getUserId()); - pageViews.add(pv); - - if (pageViews.size() == 2) { - //got all messages for this task - final long time = System.currentTimeMillis() + 100; - timerRegistry.register("CompleteTimer", time); - } - return Collections.emptyList(); - } - - @Override - public Collection<PageView> onTimer(String key, long time) { - return pageViews; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java deleted file mode 100644 index d4e0e14..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.framework; - -import java.util.HashMap; -import java.util.Map; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.samza.test.framework.TestTimerApp.*; - -public class TimerTest extends StreamApplicationIntegrationTestHarness { - - @Before - public void setup() { - // create topics - createTopic(PAGE_VIEWS, 2); - - // create events for the following user activity. - // userId: (viewId, pageId, (adIds)) - // u1: (v1, p1, (a1)), (v2, p2, (a3)) - // u2: (v3, p1, (a1)), (v4, p3, (a5)) - produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); - produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); - produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); - produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); - - } - - @Test - public void testJob() throws InterruptedException { - Map<String, String> configs = new HashMap<>(); - configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); - configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); - configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); - configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); - configs.put(JobConfig.PROCESSOR_ID(), "0"); - - runApplication(new TestTimerApp(), "TimerTest", configs); - } -}
