http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java new file mode 100644 index 0000000..9ca4f35 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + + +/** + * Example implementation of a task that splits its input into multiple output streams. + */ +public class BroadcastExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + + StreamApplication app = new BroadcastExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + KVSerde<String, PageViewEvent> pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); + MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("pageViewEventStream", pgeMsgSerde); + + inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream("outStream1", pgeMsgSerde)); + inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream("outStream2", pgeMsgSerde)); + inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream("outStream3", pgeMsgSerde)); + } + + class PageViewEvent { + String key; + long timestamp; + + public PageViewEvent(String key, long timestamp) { + this.key = key; + this.timestamp = timestamp; + } + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java new file mode 100644 index 0000000..9edaabe --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.task.TaskContext; +import org.apache.samza.util.CommandLine; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + + +/** + * Example code using {@link KeyValueStore} to implement event-time window + */ +public class KeyValueStoreExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + KeyValueStoreExample app = new KeyValueStoreExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<PageViewEvent> pageViewEvents = + graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); + OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = + graph.getOutputStream("pageViewEventPerMember", + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class))); + + pageViewEvents + .partitionBy(pve -> pve.memberId, pve -> pve, + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") + .map(KV::getValue) + .flatMap(new MyStatsCounter()) + .map(stats -> KV.of(stats.memberId, stats)) + .sendTo(pageViewEventPerMember); + } + + static class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { + private final int timeoutMs = 10 * 60 * 1000; + + KeyValueStore<String, StatsWindowState> statsStore; + + class StatsWindowState { + int lastCount = 0; + long timeAtLastOutput = 0; + int newCount = 0; + } + + @Override + public Collection<StatsOutput> apply(PageViewEvent message) { + List<StatsOutput> outputStats = new ArrayList<>(); + long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5; + String wndKey = String.format("%s-%d", message.memberId, wndTimestamp); + StatsWindowState curState = this.statsStore.get(wndKey); + if (curState == null) { + curState = new StatsWindowState(); + } + curState.newCount++; + long curTimeMs = System.currentTimeMillis(); + if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { + curState.timeAtLastOutput = curTimeMs; + curState.lastCount += curState.newCount; + curState.newCount = 0; + outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount)); + } + // update counter w/o generating output + this.statsStore.put(wndKey, curState); + return outputStats; + } + + @Override + public void init(Config config, TaskContext context) { + this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); + } + } + + class PageViewEvent { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + } + + static class StatsOutput { + private String memberId; + private long timestamp; + private Integer count; + + StatsOutput(String key, long timestamp, Integer count) { + this.memberId = key; + this.timestamp = timestamp; + this.count = count; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/MergeExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java new file mode 100644 index 0000000..ff983a4 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import com.google.common.collect.ImmutableList; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + +public class MergeExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + MergeExample app = new MergeExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + KVSerde<String, PageViewEvent> + pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); + + MessageStream.mergeAll(ImmutableList.of(graph.getInputStream("viewStream1", pgeMsgSerde), + graph.getInputStream("viewStream2", pgeMsgSerde), graph.getInputStream("viewStream3", pgeMsgSerde))) + .sendTo(graph.getOutputStream("mergedStream", pgeMsgSerde)); + + } + + class PageViewEvent { + String pageId; + long viewTimestamp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java new file mode 100644 index 0000000..1c0bc25 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; + + +/** + * Simple 2-way stream-to-stream join example + */ +public class OrderShipmentJoinExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + OrderShipmentJoinExample app = new OrderShipmentJoinExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<OrderRecord> orders = + graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class)); + MessageStream<ShipmentRecord> shipments = + graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class)); + OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders = + graph.getOutputStream("fulfilledOrders", + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class))); + + orders + .join(shipments, new MyJoinFunction(), + new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), + Duration.ofMinutes(1), "join") + .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) + .sendTo(fulfilledOrders); + + } + + static class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> { + @Override + public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) { + return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs); + } + + @Override + public String getFirstKey(OrderRecord message) { + return message.orderId; + } + + @Override + public String getSecondKey(ShipmentRecord message) { + return message.orderId; + } + } + + class OrderRecord { + String orderId; + long orderTimeMs; + + OrderRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.orderTimeMs = timeMs; + } + } + + class ShipmentRecord { + String orderId; + long shipTimeMs; + + ShipmentRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.shipTimeMs = timeMs; + } + } + + static class FulfilledOrderRecord { + String orderId; + long orderTimeMs; + long shipTimeMs; + + FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) { + this.orderId = orderId; + this.orderTimeMs = orderTimeMs; + this.shipTimeMs = shipTimeMs; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java new file mode 100644 index 0000000..2581506 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.time.Duration; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.SupplierFunction; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + + +/** + * Example code to implement window-based counter + */ +public class PageViewCounterExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + PageViewCounterExample app = new PageViewCounterExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<PageViewEvent> pageViewEvents = null; + pageViewEvents = graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); + OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = + graph.getOutputStream("pageViewEventPerMemberStream", + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); + + SupplierFunction<Integer> initialValue = () -> 0; + FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1; + pageViewEvents + .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null) + .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) + .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow") + .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane))) + .sendTo(pageViewEventPerMemberStream); + + } + + class PageViewEvent { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + } + + static class PageViewCount { + String memberId; + long timestamp; + int count; + + PageViewCount(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java new file mode 100644 index 0000000..7f28346 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; + + +/** + * Example {@link StreamApplication} code to test the API methods with re-partition operator + */ +public class RepartitionExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + RepartitionExample app = new RepartitionExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream<PageViewEvent> pageViewEvents = + graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = + graph.getOutputStream("pageViewEventPerMember", + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class))); + + pageViewEvents + .partitionBy(pve -> pve.memberId, pve -> pve, + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") + .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), + "window") + .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) + .sendTo(pageViewEventPerMember); + + } + + class PageViewEvent { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + } + + static class MyStreamOutput { + String memberId; + long timestamp; + int count; + + MyStreamOutput(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java new file mode 100644 index 0000000..4950695 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.SupplierFunction; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; + + +/** + * Example implementation of a simple user-defined task w/ a window operator. + * + */ +public class WindowExample implements StreamApplication { + + // local execution mode + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + WindowExample app = new WindowExample(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + SupplierFunction<Integer> initialValue = () -> 0; + FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1; + MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>()); + OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde()); + + // create a tumbling window that outputs the number of message collected every 10 minutes. + // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive + // for 1 minute. + inputStream + .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde()) + .setLateTrigger(Triggers.any(Triggers.count(30000), + Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window") + .map(WindowPane::getMessage) + .sendTo(outputStream); + + } + + class PageViewEvent { + String key; + long timestamp; + + public PageViewEvent(String key, long timestamp) { + this.key = key; + this.timestamp = timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java index de0d962..a1ac299 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java @@ -20,6 +20,8 @@ package org.apache.samza.test.framework; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.ObjectInputStream; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.SinkFunction; @@ -155,6 +157,18 @@ public class StreamAssert<M> { } } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + timer = new Timer(); + actual = Collections.synchronizedList(new ArrayList<>()); + timerTask = new TimerTask() { + @Override + public void run() { + check(); + } + }; + } + private void check() { final CountDownLatch latch = LATCHES.get(id); try { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index 29c509d..3301af8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -40,11 +40,10 @@ import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.ArraySystemFactory; import org.apache.samza.test.util.Base64Serializer; -import org.junit.Test; +import org.junit.Test; import static org.junit.Assert.assertEquals; - /** * This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input. * It verifies the pipeline will stop and the number of output messages should equal to the input. @@ -53,6 +52,8 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; + private static List<PageView> received = new ArrayList<>(); + @Test public void testPipeline() throws Exception { Random random = new Random(); @@ -66,6 +67,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { int partitionCount = 4; Map<String, String> configs = new HashMap<>(); + configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); configs.put("streams.PageView.samza.system", "test"); configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews)); @@ -89,7 +91,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); - List<PageView> received = new ArrayList<>(); final StreamApplication app = (streamGraph, cfg) -> { streamGraph.<KV<String, PageView>>getInputStream("PageView") .map(Values.create()) @@ -98,6 +99,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { received.add(m.getValue()); }); }; + runner.run(app); runner.waitForFinish(); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index dda3d24..d4dc4ed 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -118,6 +118,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { @Test public void testWatermark() throws Exception { Map<String, String> configs = new HashMap<>(); + configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); configs.put("systems.test.samza.factory", TestSystemFactory.class.getName()); configs.put("streams.PageView.samza.system", "test"); configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT)); @@ -140,7 +141,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); List<PageView> received = new ArrayList<>(); final StreamApplication app = (streamGraph, cfg) -> { streamGraph.<KV<String, PageView>>getInputStream("PageView") @@ -150,11 +150,14 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { received.add(m.getValue()); }); }; + + LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); runner.run(app); + // processors are only available when the app is running Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner); runner.waitForFinish(); - + // wait for the completion to ensure that all tasks are actually initialized and the OperatorImplGraph is initialized StreamOperatorTask task0 = tasks.get("Partition 0"); OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0); OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index 120f902..2171d07 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -19,17 +19,22 @@ package org.apache.samza.test.operator; +import java.util.ArrayList; +import java.util.List; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.operator.data.AdClick; @@ -37,6 +42,8 @@ import org.apache.samza.test.operator.data.PageView; import org.apache.samza.test.operator.data.UserPageAdClick; import java.time.Duration; +import org.apache.samza.util.CommandLine; + /** * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count. @@ -47,6 +54,19 @@ public class RepartitionJoinWindowApp implements StreamApplication { public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2"; public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName"; + private final List<StreamSpec> intermediateStreams = new ArrayList<>(); + + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + + RepartitionJoinWindowApp application = new RepartitionJoinWindowApp(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(application); + runner.waitForFinish(); + } + @Override public void init(StreamGraph graph, Config config) { String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP); @@ -56,25 +76,27 @@ public class RepartitionJoinWindowApp implements StreamApplication { MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class)); MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class)); - MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews + MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews .partitionBy(PageView::getViewId, pv -> pv, - new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId") - .map(KV::getValue); + new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId"); + + MessageStream<PageView> pageViewsRepartitionedByViewIdValueONly = pageViewsRepartitionedByViewId.map(KV::getValue); - MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks + MessageStream<KV<String, AdClick>> adClicksRepartitionedByViewId = adClicks .partitionBy(AdClick::getViewId, ac -> ac, - new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId") - .map(KV::getValue); + new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId"); + MessageStream<AdClick> adClicksRepartitionedByViewIdValueOnly = adClicksRepartitionedByViewId.map(KV::getValue); - MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId - .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(), + MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewIdValueONly + .join(adClicksRepartitionedByViewIdValueOnly, new UserPageViewAdClicksJoiner(), new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class), Duration.ofMinutes(1), "pageViewAdClickJoin"); - userPageAdClicks + MessageStream<KV<String, UserPageAdClick>> userPageAdClicksByUserId = userPageAdClicks .partitionBy(UserPageAdClick::getUserId, upac -> upac, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId") - .map(KV::getValue) + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId"); + + userPageAdClicksByUserId.map(KV::getValue) .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3), new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow") .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) @@ -82,6 +104,16 @@ public class RepartitionJoinWindowApp implements StreamApplication { taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER); messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue())); }); + + + intermediateStreams.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamSpec()); + intermediateStreams.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamSpec()); + intermediateStreams.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamSpec()); + + } + + public List<StreamSpec> getIntermediateStreams() { + return intermediateStreams; } private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java new file mode 100644 index 0000000..e233793 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.operator; + +import java.time.Duration; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.test.operator.data.PageView; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count. + */ +public class RepartitionWindowApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class); + + static final String INPUT_TOPIC = "page-views"; + static final String OUTPUT_TOPIC = "Result"; + + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + RepartitionWindowApp reparApp = new RepartitionWindowApp(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(reparApp); + runner.waitForFinish(); + } + + @Override + public void init(StreamGraph graph, Config config) { + + KVSerde<String, PageView> + pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class)); + + graph.getInputStream(INPUT_TOPIC, pgeMsgSerde) + .map(KV::getValue) + .partitionBy(PageView::getUserId, m -> m, pgeMsgSerde, "p1") + .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1") + .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage()))) + .sendTo(graph.getOutputStream(OUTPUT_TOPIC)); + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 997127e..3224d24 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -19,6 +19,7 @@ package org.apache.samza.test.operator; +import java.time.Duration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -26,13 +27,15 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.test.operator.data.PageView; - -import java.time.Duration; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link StreamApplication} that demonstrates a filter followed by a session window. @@ -40,10 +43,23 @@ import java.time.Duration; public class SessionWindowApp implements StreamApplication { private static final String INPUT_TOPIC = "page-views"; private static final String OUTPUT_TOPIC = "page-view-counts"; + + private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class); private static final String FILTER_KEY = "badKey"; + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + SessionWindowApp app = new SessionWindowApp(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + @Override public void init(StreamGraph graph, Config config) { + MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class)); OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde())); @@ -54,5 +70,6 @@ public class SessionWindowApp implements StreamApplication { new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow") .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 77cd19a..5424888 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -122,7 +122,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe // Verify that messages in the intermediate stream will be deleted in 10 seconds long startTimeMs = System.currentTimeMillis(); - for (StreamSpec spec: runner.getExecutionPlan(app).getIntermediateStreams()) { + for (StreamSpec spec: app.getIntermediateStreams()) { long remainingMessageNum = -1; while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java new file mode 100644 index 0000000..fbc315f --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.test.operator.data.PageView; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.apache.samza.test.operator.RepartitionWindowApp.*; + +/** + * Test driver for {@link RepartitionWindowApp}. + */ +public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness { + + static final String APP_NAME = "PageViewCounterApp"; + + @Test + public void testRepartitionedSessionWindowCounter() throws Exception { + // create topics + createTopic(INPUT_TOPIC, 3); + createTopic(OUTPUT_TOPIC, 1); + + // produce messages to different partitions. + ObjectMapper mapper = new ObjectMapper(); + PageView pv = new PageView("india", "5.com", "userId1"); + produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv)); + pv = new PageView("china", "4.com", "userId2"); + produceMessage(INPUT_TOPIC, 1, "userId2", mapper.writeValueAsString(pv)); + pv = new PageView("india", "1.com", "userId1"); + produceMessage(INPUT_TOPIC, 2, "userId1", mapper.writeValueAsString(pv)); + pv = new PageView("india", "2.com", "userId1"); + produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv)); + pv = new PageView("india", "3.com", "userId1"); + produceMessage(INPUT_TOPIC, 1, "userId1", mapper.writeValueAsString(pv)); + + Map<String, String> configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"); + configs.put(String.format("streams.%s.samza.msg.serde", INPUT_TOPIC), "string"); + configs.put(String.format("streams.%s.samza.key.serde", INPUT_TOPIC), "string"); + + // run the application + runApplication(new RepartitionWindowApp(), APP_NAME, new MapConfig(configs)); + + // consume and validate result + List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2); + Assert.assertEquals(messages.size(), 2); + + for (ConsumerRecord<String, String> message : messages) { + String key = message.key(); + String value = message.value(); + // Assert that there are 4 messages for userId1 and 1 message for userId2. + Assert.assertTrue(key.equals("userId1") || key.equals("userId2")); + if ("userId1".equals(key)) { + Assert.assertEquals(value, "4"); + } else { + Assert.assertEquals(value, "1"); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index 5d2a17c..40a3f91 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -19,6 +19,7 @@ package org.apache.samza.test.operator; +import java.time.Duration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -26,13 +27,16 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.test.operator.data.PageView; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.time.Duration; /** * A {@link StreamApplication} that demonstrates a filter followed by a tumbling window. @@ -40,10 +44,23 @@ import java.time.Duration; public class TumblingWindowApp implements StreamApplication { private static final String INPUT_TOPIC = "page-views"; private static final String OUTPUT_TOPIC = "page-view-counts"; + + private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class); private static final String FILTER_KEY = "badKey"; + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + TumblingWindowApp app = new TumblingWindowApp(); + LocalApplicationRunner runner = new LocalApplicationRunner(config); + + runner.run(app); + runner.waitForFinish(); + } + @Override public void init(StreamGraph graph, Config config) { + MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class)); OutputStream<KV<String, Integer>> outputStream = @@ -55,5 +72,6 @@ public class TumblingWindowApp implements StreamApplication { new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow") .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size())) .sendTo(outputStream); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java index b114b43..e950366 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java @@ -19,9 +19,10 @@ package org.apache.samza.test.operator.data; +import java.io.Serializable; import org.codehaus.jackson.annotate.JsonProperty; -public class PageView { +public class PageView implements Serializable { private String viewId; private String pageId; private String userId; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java new file mode 100644 index 0000000..9072bd2 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.processor; + +import java.util.HashMap; +import java.util.Map; + + +/** + * Shared context factories used in unit tests. This is a temporarily solution to enable sharing of test latches in different + * scope of context (i.e. in the container or the task). This is not intended for production usage. + */ +public class SharedContextFactories { + + public static class SharedContextFactory { + static Map<String, SharedContextFactory> sharedContextFactories = new HashMap<>(); + + Map<String, Object> sharedObjects = new HashMap<>(); + + public Object getSharedObject(String resourceName) { + return this.sharedObjects.get(resourceName); + } + + public void addSharedObject(String resourceName, Object sharedObj) { + this.sharedObjects.put(resourceName, sharedObj); + } + + public static SharedContextFactory getInstance(String taskName) { + if (sharedContextFactories.get(taskName) == null) { + sharedContextFactories.putIfAbsent(taskName, new SharedContextFactory()); + } + return sharedContextFactories.get(taskName); + } + + public static void clearAll() { + sharedContextFactories.clear(); + } + } + + public static class ProcessorSharedContextFactory extends SharedContextFactory { + static Map<String, ProcessorSharedContextFactory> processorSharedFactories = new HashMap<>(); + + private final String processorId; + + SharedContextFactory getTaskSharedContextFactory(String taskName) { + String globalTaskName = String.format("%s-%s", this.processorId, taskName); + return SharedContextFactory.getInstance(globalTaskName); + } + + public static ProcessorSharedContextFactory getInstance(String processorId) { + if (processorSharedFactories.get(processorId) == null) { + processorSharedFactories.putIfAbsent(processorId, new ProcessorSharedContextFactory(processorId)); + } + return processorSharedFactories.get(processorId); + } + + ProcessorSharedContextFactory(String processorId) { + this.processorId = processorId; + } + + public static void clearAll() { + processorSharedFactories.clear(); + } + } + + public static class GlobalSharedContextFactory extends SharedContextFactory { + static Map<String, GlobalSharedContextFactory> globalSharedContextFactories = new HashMap<>(); + + private final String appName; + + GlobalSharedContextFactory(String appName) { + this.appName = appName; + } + + ProcessorSharedContextFactory getProcessorSharedContextFactory(String processorName) { + String globalProcessorName = String.format("%s-%s", this.appName, processorName); + return ProcessorSharedContextFactory.getInstance(globalProcessorName); + } + + public static GlobalSharedContextFactory getInstance(String appName) { + if (globalSharedContextFactories.get(appName) == null) { + globalSharedContextFactories.putIfAbsent(appName, new GlobalSharedContextFactory(appName)); + } + return globalSharedContextFactories.get(appName); + } + + public static void clearAll() { + globalSharedContextFactories.clear(); + } + } + + public static GlobalSharedContextFactory getGlobalSharedContextFactory(String appName) { + return GlobalSharedContextFactory.getInstance(appName); + } + + public static void clearAll() { + GlobalSharedContextFactory.clearAll(); + ProcessorSharedContextFactory.clearAll(); + SharedContextFactory.clearAll(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java new file mode 100644 index 0000000..db12351 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.processor; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.StringSerde; + + +/** + * Test class to create an {@link StreamApplication} instance + */ +public class TestStreamApplication implements StreamApplication, Serializable { + + private final String inputTopic; + private final String outputTopic; + private final String appName; + private final String processorName; + + private TestStreamApplication(String inputTopic, String outputTopic, String appName, String processorName) { + this.inputTopic = inputTopic; + this.outputTopic = outputTopic; + this.appName = appName; + this.processorName = processorName; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>()); + OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde()); + inputStream.map(new MapFunction<String, String>() { + transient CountDownLatch latch1; + transient CountDownLatch latch2; + transient StreamApplicationCallback callback; + + @Override + public String apply(String message) { + TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message); + if (callback != null) { + callback.onMessage(incomingMessage); + } + if (latch1 != null) { + latch1.countDown(); + } + if (latch2 != null) { + latch2.countDown(); + } + return incomingMessage.toString(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + SharedContextFactories.SharedContextFactory contextFactory = + SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName); + this.latch1 = (CountDownLatch) contextFactory.getSharedObject("processedMsgLatch"); + this.latch2 = (CountDownLatch) contextFactory.getSharedObject("kafkaMsgsConsumedLatch"); + this.callback = (StreamApplicationCallback) contextFactory.getSharedObject("callback"); + } + }).sendTo(outputStream); + } + + public interface StreamApplicationCallback { + void onMessage(TestKafkaEvent m); + } + + public static class TestKafkaEvent implements Serializable { + + // Actual content of the event. + private String eventData; + + // Contains Integer value, which is greater than previous message id. + private String eventId; + + TestKafkaEvent(String eventId, String eventData) { + this.eventData = eventData; + this.eventId = eventId; + } + + String getEventId() { + return eventId; + } + + String getEventData() { + return eventData; + } + + @Override + public String toString() { + return eventId + "|" + eventData; + } + + static TestKafkaEvent fromString(String message) { + String[] messageComponents = message.split("|"); + return new TestKafkaEvent(messageComponents[0], messageComponents[1]); + } + } + + public static StreamApplication getInstance( + String inputTopic, + String outputTopic, + CountDownLatch processedMessageLatch, + StreamApplicationCallback callback, + CountDownLatch kafkaEventsConsumedLatch, + Config config) { + String appName = String.format("%s-%s", config.get(ApplicationConfig.APP_NAME), config.get(ApplicationConfig.APP_ID)); + String processorName = config.get(JobConfig.PROCESSOR_ID()); + registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, appName, processorName); + + StreamApplication app = new TestStreamApplication(inputTopic, outputTopic, appName, processorName); + return app; + } + + private static void registerLatches(CountDownLatch processedMessageLatch, CountDownLatch kafkaEventsConsumedLatch, + StreamApplicationCallback callback, String appName, String processorName) { + SharedContextFactories.SharedContextFactory contextFactory = SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName); + contextFactory.addSharedObject("processedMsgLatch", processedMessageLatch); + contextFactory.addSharedObject("kafkaMsgsConsumedLatch", kafkaEventsConsumedLatch); + contextFactory.addSharedObject("callback", callback); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 9d2cd92..0b0a271 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -23,6 +23,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import kafka.admin.AdminUtils; import kafka.utils.TestUtils; import org.I0Itec.zkclient.ZkClient; @@ -42,33 +50,23 @@ import org.apache.samza.container.TaskName; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.StringSerde; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; +import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback; +import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent; import org.apache.samza.util.NoOpMetricsRegistry; import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkUtils; -import org.junit.*; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -102,9 +100,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private ApplicationConfig applicationConfig1; private ApplicationConfig applicationConfig2; private ApplicationConfig applicationConfig3; - private LocalApplicationRunner applicationRunner1; - private LocalApplicationRunner applicationRunner2; - private LocalApplicationRunner applicationRunner3; private String testStreamAppName; private String testStreamAppId; @@ -141,11 +136,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); - // Create local application runners. - applicationRunner1 = new LocalApplicationRunner(applicationConfig1); - applicationRunner2 = new LocalApplicationRunner(applicationConfig2); - applicationRunner3 = new LocalApplicationRunner(applicationConfig3); - for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) { LOGGER.info("Creating kafka topic: {}.", kafkaTopic); TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties()); @@ -251,9 +241,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up stream app 2. CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS); - LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig)); - StreamApplication streamApp2 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, - processedMessagesLatch, null, null); + Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); + LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + processedMessagesLatch, null, null, testAppConfig2); // Callback handler for streamApp1. StreamApplicationCallback streamApplicationCallback = message -> { @@ -272,9 +263,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); // Set up stream app 1. - LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig)); - StreamApplication streamApp1 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, - null, streamApplicationCallback, kafkaEventsConsumedLatch); + Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); + LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, + null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); localApplicationRunner1.run(streamApp1); kafkaEventsConsumedLatch.await(); @@ -288,6 +280,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value. assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); + + // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 + // localApplicationRunner1.kill(streamApp1); + // localApplicationRunner2.kill(streamApp2); + + // localApplicationRunner1.waitForFinish(); + // localApplicationRunner2.waitForFinish(); } /** @@ -326,8 +325,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up streamApp2. CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2); - LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig)); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null); + Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig); + LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2); // Callback handler for streamApp1. StreamApplicationCallback streamApplicationCallback = message -> { @@ -348,9 +348,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1); // Set up stream app 1. - LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig)); - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, - streamApplicationCallback, kafkaEventsConsumedLatch); + Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig); + LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, + streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1); localApplicationRunner1.run(streamApp1); kafkaEventsConsumedLatch.await(); @@ -381,8 +382,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(taskModel1.getSystemStreamPartitions(), taskModel2.getSystemStreamPartitions()); assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName())); - // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) processedMessagesLatch.await(); + + assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2)); + + // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 + // localApplicationRunner1.kill(streamApp1); + // localApplicationRunner2.kill(streamApp2); + + // localApplicationRunner1.waitForFinish(); + // localApplicationRunner2.waitForFinish(); } @Test @@ -396,8 +405,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3); + + // Create LocalApplicationRunners + LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); + LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); // Run stream applications. applicationRunner1.run(streamApp1); @@ -428,7 +442,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne kafkaEventsConsumedLatch.await(); publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch); + LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3); applicationRunner3.run(streamApp3); processedMessagesLatch3.await(); @@ -441,6 +455,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne jobModel = zkUtils.getJobModel(jobModelVersion); assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet()); assertEquals(2, jobModel.getContainers().size()); + + // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 + // applicationRunner2.kill(streamApp2); + // applicationRunner3.kill(streamApp3); + + // applicationRunner2.waitForFinish(); + // applicationRunner3.waitForFinish(); } @Test @@ -453,8 +474,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); + + // Create LocalApplicationRunners + LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); + LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); // Run stream applications. applicationRunner1.run(streamApp1); @@ -464,15 +489,24 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne processedMessagesLatch1.await(); processedMessagesLatch2.await(); - LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(new MapConfig(applicationConfig2)); + LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2); // Create a stream app with same processor id as SP2 and run it. It should fail. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]); kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); - StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch); + StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2); // Fail when the duplicate processor joins. expectedException.expect(SamzaException.class); - applicationRunner3.run(streamApp3); + try { + applicationRunner3.run(streamApp3); + } finally { + // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 + // applicationRunner1.kill(streamApp1); + // applicationRunner2.kill(streamApp2); + + // applicationRunner1.waitForFinish(); + // applicationRunner2.waitForFinish(); + } } @Test @@ -496,13 +530,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2); + List<TestKafkaEvent> messagesProcessed = new ArrayList<>(); + StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m); + // Create StreamApplication from configuration. CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS); CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); - StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch); - StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch); + StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1); + StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2); // Run stream application. applicationRunner1.run(streamApp1); @@ -521,7 +558,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1); processedMessagesLatch1 = new CountDownLatch(1); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch); + streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1); applicationRunner4.run(streamApp1); processedMessagesLatch1.await(); @@ -532,85 +569,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion)); assertEquals(jobModel.getContainers(), newJobModel.getContainers()); - } - - public interface StreamApplicationCallback { - void onMessageReceived(TestKafkaEvent message); - } - - private static class TestKafkaEvent implements Serializable { - - // Actual content of the event. - private String eventData; - - // Contains Integer value, which is greater than previous message id. - private String eventId; - TestKafkaEvent(String eventId, String eventData) { - this.eventData = eventData; - this.eventId = eventId; - } - - String getEventId() { - return eventId; - } - - String getEventData() { - return eventData; - } - - @Override - public String toString() { - return eventId + "|" + eventData; - } + // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665 + // applicationRunner2.kill(streamApp2); + // applicationRunner4.kill(streamApp1); - static TestKafkaEvent fromString(String message) { - String[] messageComponents = message.split("\\|"); - return new TestKafkaEvent(messageComponents[0], messageComponents[1]); - } + // applicationRunner2.waitForFinish(); + // applicationRunner4.waitForFinish(); } - /** - * Publishes all input events to output topic(has no processing logic) - * and triggers {@link StreamApplicationCallback} with each received event. - **/ - private static class TestStreamApplication implements StreamApplication { - - private final String inputTopic; - private final String outputTopic; - private final CountDownLatch processedMessagesLatch; - private final StreamApplicationCallback streamApplicationCallback; - private final CountDownLatch kafkaEventsConsumedLatch; - - TestStreamApplication(String inputTopic, String outputTopic, - CountDownLatch processedMessagesLatch, - StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) { - this.inputTopic = inputTopic; - this.outputTopic = outputTopic; - this.processedMessagesLatch = processedMessagesLatch; - this.streamApplicationCallback = streamApplicationCallback; - this.kafkaEventsConsumedLatch = kafkaEventsConsumedLatch; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>()); - OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde()); - inputStream - .map(msg -> { - TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg); - if (streamApplicationCallback != null) { - streamApplicationCallback.onMessageReceived(incomingMessage); - } - if (processedMessagesLatch != null) { - processedMessagesLatch.countDown(); - } - if (kafkaEventsConsumedLatch != null) { - kafkaEventsConsumedLatch.countDown(); - } - return incomingMessage.toString(); - }) - .sendTo(outputStream); - } - } }
