http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java new file mode 100644 index 0000000..f0f6ef2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraphImpl; + + +/** + * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment + */ +public class StandaloneExecutionEnvironment implements ExecutionEnvironment { + + // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} + StreamGraph createGraph(StreamGraphBuilder app, Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + app.init(graph, config); + return graph; + } + + @Override public void run(StreamGraphBuilder app, Config config) { + // 1. get logic graph for optimization + // StreamGraph logicGraph = this.createGraph(app, config); + // 2. potential optimization.... + // 3. create new instance of StreamGraphBuilder that would generate the optimized graph + // 4. create all input/output/intermediate topics + // 5. create the configuration for StreamProcessor + // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder + } + +}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java new file mode 100644 index 0000000..b007e3c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; + +import java.util.HashMap; +import java.util.Map; + + +/** + * Execution of the logic sub-DAG + * + * + * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them + * through the user's stream transformations defined in {@link StreamGraphImpl} using the + * {@link org.apache.samza.operators.MessageStream} APIs. + * <p> + * This class brings all the operator API implementation components together and feeds the + * {@link InputMessageEnvelope}s into the transformation chains. + * <p> + * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor. + * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl} + * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context + * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input + * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl} + * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}. + * <p> + * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input + * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG + * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the + * root node of the DAG, which this class saves. + * <p> + * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it + * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} + * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates + * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s. + */ +public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask { + + /** + * A mapping from each {@link SystemStream} to the root node of its operator chain DAG. + */ + private final OperatorGraph operatorGraph = new OperatorGraph(); + + private final StreamGraphBuilder graphBuilder; + + private ContextManager contextManager; + + public StreamOperatorTask(StreamGraphBuilder graphBuilder) { + this.graphBuilder = graphBuilder; + } + + @Override + public final void init(Config config, TaskContext context) throws Exception { + // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task + StreamGraphImpl streams = new StreamGraphImpl(); + this.graphBuilder.init(streams, config); + // get the context manager of the {@link StreamGraph} and initialize the task-specific context + this.contextManager = streams.getContextManager(); + + Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>(); + context.getSystemStreamPartitions().forEach(ssp -> { + if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { + // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream} + inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream())); + } + }); + operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context)); + } + + @Override + public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream()) + .onNext(new InputMessageEnvelope(ime), collector, coordinator); + } + + @Override + public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + // TODO: invoke timer based triggers + } + + @Override + public void close() throws Exception { + this.contextManager.finalizeTaskContext(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java new file mode 100644 index 0000000..85ebc6c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.TaskContext; +import org.apache.samza.util.CommandLine; + +import java.util.Properties; + + +/** + * Example code using {@link KeyValueStore} to implement event-time window + */ +public class KeyValueStoreExample implements StreamGraphBuilder { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * UserMainExample runnableApp = new UserMainExample(); + * runnableApp.run(remoteEnv, config); + * } + * + */ + @Override public void init(StreamGraph graph, Config config) { + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>()); + + pageViewEvents. + partitionBy(m -> m.getMessage().memberId). + flatMap(new MyStatsCounter()). + sendTo(pageViewPerMemberCounters); + + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new KeyValueStoreExample(), config); + } + + class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { + private final int timeoutMs = 10 * 60 * 1000; + + KeyValueStore<String, StatsWindowState> statsStore; + + class StatsWindowState { + int lastCount = 0; + long timeAtLastOutput = 0; + int newCount = 0; + } + + @Override + public Collection<StatsOutput> apply(PageViewEvent message) { + List<StatsOutput> outputStats = new ArrayList<>(); + long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5; + String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp); + StatsWindowState curState = this.statsStore.get(wndKey); + curState.newCount++; + long curTimeMs = System.currentTimeMillis(); + if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { + curState.timeAtLastOutput = curTimeMs; + curState.lastCount += curState.newCount; + curState.newCount = 0; + outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount)); + } + // update counter w/o generating output + this.statsStore.put(wndKey, curState); + return outputStats; + } + + @Override + public void init(Config config, TaskContext context) { + this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); + } + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class StatsOutput implements MessageEnvelope<String, StatsOutput> { + private String memberId; + private long timestamp; + private Integer count; + + StatsOutput(String key, long timestamp, Integer count) { + this.memberId = key; + this.timestamp = timestamp; + this.count = count; + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public StatsOutput getMessage() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java new file mode 100644 index 0000000..c6d2e6e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.CommandLine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + + +/** + * Example {@link StreamGraphBuilder} code to test the API methods + */ +public class NoContextStreamExample implements StreamGraphBuilder { + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "input1"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec input2 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "input2"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "output"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> { + + @Override + public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1, + JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } + + @Override + public String getFirstKey(JsonMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(JsonMessageEnvelope message) { + return message.getKey(); + } + } + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config); + * remoteEnv.run(new NoContextStreamExample(), config); + * } + * + */ + @Override public void init(StreamGraph graph, Config config) { + MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream( + input1, null, null); + MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream( + input2, null, null); + OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output, + new StringSerde("UTF-8"), new JsonSerde<>()); + + inputSource1.map(this::getInputMessage). + join(inputSource2.map(this::getInputMessage), new MyJoinFunction()). + sendTo(outStream); + + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new NoContextStreamExample(), config); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java new file mode 100644 index 0000000..0477066 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.util.Properties; + + +/** + * Simple 2-way stream-to-stream join example + */ +public class OrderShipmentJoinExample implements StreamGraphBuilder { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * UserMainExample runnableApp = new UserMainExample(); + * runnableApp.run(remoteEnv, config); + * } + * + */ + @Override public void init(StreamGraph graph, Config config) { + + MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders); + + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new OrderShipmentJoinExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "Orders"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec input2 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "Shipment"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "FulfilledOrders"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class OrderRecord implements MessageEnvelope<String, OrderRecord> { + String orderId; + long orderTimeMs; + + OrderRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.orderTimeMs = timeMs; + } + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public OrderRecord getMessage() { + return this; + } + } + + class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> { + String orderId; + long shipTimeMs; + + ShipmentRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.shipTimeMs = timeMs; + } + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public ShipmentRecord getMessage() { + return this; + } + } + + class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> { + String orderId; + long orderTimeMs; + long shipTimeMs; + + FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) { + this.orderId = orderId; + this.orderTimeMs = orderTimeMs; + this.shipTimeMs = shipTimeMs; + } + + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public FulFilledOrderRecord getMessage() { + return this; + } + } + + FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) { + return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs); + } + + class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> { + + @Override + public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) { + return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage); + } + + @Override + public String getFirstKey(OrderRecord message) { + return message.getKey(); + } + + @Override + public String getSecondKey(ShipmentRecord message) { + return message.getKey(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java new file mode 100644 index 0000000..f7d8bda --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; +import java.util.Properties; + + +/** + * Example code to implement window-based counter + */ +public class PageViewCounterExample implements StreamGraphBuilder { + + @Override public void init(StreamGraph graph, Config config) { + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + pageViewEvents. + window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). + setEarlyTrigger(Triggers.repeat(Triggers.count(5))). + setAccumulationMode(AccumulationMode.DISCARDING)). + map(MyStreamOutput::new). + sendTo(pageViewPerMemberCounters); + + } + + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new PageViewCounterExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { + String memberId; + long timestamp; + int count; + + MyStreamOutput(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public MyStreamOutput getMessage() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java new file mode 100644 index 0000000..6994ac4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.*; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; +import java.util.*; + + +/** + * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator + */ +public class RepartitionExample implements StreamGraphBuilder { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * remoteEnv.run(new UserMainExample(), config); + * } + * + */ + @Override public void init(StreamGraph graph, Config config) { + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + pageViewEvents. + partitionBy(m -> m.getMessage().memberId). + window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow( + msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)). + map(MyStreamOutput::new). + sendTo(pageViewPerMemberCounters); + + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new RepartitionExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { + String memberId; + long timestamp; + int count; + + MyStreamOutput(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public MyStreamOutput getMessage() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java new file mode 100644 index 0000000..8ecd44f --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.lang.reflect.Field; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Unit test for {@link StreamOperatorTask} + */ +public class TestBasicStreamGraphs { + + private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { + for (int i = 0; i < 4; i++) { + this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i))); + } + } }; + + @Test + public void testUserTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestWindowExample userTask = new TestWindowExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testSplitTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + + @Test + public void testJoinTask() throws Exception { + Config mockConfig = mock(Config.class); + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); + TestJoinExample joinTask = new TestJoinExample(this.inputPartitions); + StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask); + Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); + pipelineMapFld.setAccessible(true); + OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); + + adaptorTask.init(mockConfig, mockContext); + this.inputPartitions.forEach(partition -> { + assertNotNull(opGraph.get(partition.getSystemStream())); + }); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java new file mode 100644 index 0000000..d22324b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of split stream tasks + * + */ +public class TestBroadcastExample extends TestExampleBase { + + TestBroadcastExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { + return this.timestamp; + } + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public void init(StreamGraph graph, Config config) { + BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; + inputs.keySet().forEach(entry -> { + MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return entry; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null).map(this::getInputMessage); + + inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + }); + } + + JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { + return (JsonMessageEnvelope) m1.getMessage(); + } + + boolean myFilter1(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessageEnvelope m1) { + return m1.getMessage().parKey.equals("key3"); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java new file mode 100644 index 0000000..c4df9d4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Base class for test examples + * + */ +public abstract class TestExampleBase implements StreamGraphBuilder { + + protected final Map<SystemStream, Set<SystemStreamPartition>> inputs; + + TestExampleBase(Set<SystemStreamPartition> inputs) { + this.inputs = new HashMap<>(); + for (SystemStreamPartition input : inputs) { + this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>()); + this.inputs.get(input.getSystemStream()).add(input); + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java new file mode 100644 index 0000000..fe6e7e7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class TestJoinExample extends TestExampleBase { + + TestJoinExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + MessageStream<JsonMessageEnvelope> joinOutput = null; + + @Override + public void init(StreamGraph graph, Config config) { + + for (SystemStream input : inputs.keySet()) { + MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( + new StreamSpec() { + @Override public SystemStream getSystemStream() { + return input; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null).map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, new MyJoinFunction()); + } + } + + joinOutput.sendTo(graph.createOutStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return null; + } + + @Override public Properties getProperties() { + return null; + } + }, new StringSerde("UTF-8"), new JsonSerde<>())); + + } + + private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> { + JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } + + @Override + public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) { + return this.myJoinResult(message, otherMessage); + } + + @Override + public String getFirstKey(JsonMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(JsonMessageEnvelope message) { + return message.getKey(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java new file mode 100644 index 0000000..e08ca20 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of a simple user-defined tasks w/ window operators + * + */ +public class TestWindowExample extends TestExampleBase { + class MessageType { + String field1; + String field2; + } + + TestWindowExample(Set<SystemStreamPartition> inputs) { + super(inputs); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public void init(StreamGraph graph, Config config) { + BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; + inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return source; + } + + @Override public Properties getProperties() { + return null; + } + }, null, null). + map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), + m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); + + } + + String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { + return m.getKey().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java new file mode 100644 index 0000000..160a47a --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestMessageStreamImpl { + + private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + + @Test + public void testMap() { + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) -> + new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1); + MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap); + Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next(); + assertTrue(mapOp instanceof StreamOperatorSpec); + assertEquals(mapOp.getNextStream(), outputStream); + // assert that the transformation function is what we defined above + TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class); + TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); + when(xTestMsg.getKey()).thenReturn("test-msg-key"); + when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage); + when(mockInnerTestMessage.getValue()).thenReturn("123456789"); + + Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg); + assertEquals(cOutputMsg.size(), 1); + TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next(); + assertEquals(outputMessage.getKey(), xTestMsg.getKey()); + assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1)); + } + + @Test + public void testFlatMap() { + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { { + this.add(mock(TestOutputMessageEnvelope.class)); + this.add(mock(TestOutputMessageEnvelope.class)); + this.add(mock(TestOutputMessageEnvelope.class)); + } }; + FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts; + MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap); + Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next(); + assertTrue(flatMapOp instanceof StreamOperatorSpec); + assertEquals(flatMapOp.getNextStream(), outputStream); + // assert that the transformation function is what we defined above + assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap); + } + + @Test + public void testFilter() { + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L; + MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter); + Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next(); + assertTrue(filterOp instanceof StreamOperatorSpec); + assertEquals(filterOp.getNextStream(), outputStream); + // assert that the transformation function is what we defined above + FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn(); + TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); + TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); + when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage); + when(mockInnerTestMessage.getEventTime()).thenReturn(11111L); + Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg); + assertTrue(output.isEmpty()); + when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage); + when(mockInnerTestMessage.getEventTime()).thenReturn(999999L); + output = txfmFn.apply(mockMsg); + assertEquals(output.size(), 1); + assertEquals(output.iterator().next(), mockMsg); + } + + @Test + public void testSink() { + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); + SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> { + mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); + tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + }; + inputStream.sink(xSink); + Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next(); + assertTrue(sinkOp instanceof SinkOperatorSpec); + assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink); + assertNull(((SinkOperatorSpec) sinkOp).getNextStream()); + } + + @Test + public void testJoin() { + MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph); + MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph); + JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner = + new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { + @Override + public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { + return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); + } + + @Override + public String getFirstKey(TestMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(TestMessageEnvelope message) { + return message.getKey(); + } + }; + + MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner); + Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next(); + assertTrue(joinOp1 instanceof PartialJoinOperatorSpec); + assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput); + subs = source2.getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next(); + assertTrue(joinOp2 instanceof PartialJoinOperatorSpec); + assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput); + TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L); + TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L); + TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1); + assertEquals(xOut.getKey(), "test-join-1"); + assertEquals(xOut.getMessage(), Integer.valueOf(24)); + } + + @Test + public void testMerge() { + MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph); + Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { { + this.add(new MessageStreamImpl<>(mockGraph)); + this.add(new MessageStreamImpl<>(mockGraph)); + } }; + MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others); + validateMergeOperator(merge1, mergeOutput); + + others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); + } + + private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) { + Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs(); + assertEquals(subs.size(), 1); + OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next(); + assertTrue(mergeOp instanceof StreamOperatorSpec); + assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput); + TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); + Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg); + assertEquals(outputs.size(), 1); + assertEquals(outputs.iterator().next(), mockMsg); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java new file mode 100644 index 0000000..c4e9f51 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + + +public class TestMessageStreamImplUtil { + public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) { + return new MessageStreamImpl<M>(graph); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java new file mode 100644 index 0000000..9a425d1 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.SystemStreamPartition; + + +/** + * Example input {@link MessageEnvelope} w/ Json message and string as the key. + */ + +public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> { + + private final String key; + private final T data; + private final Offset offset; + private final SystemStreamPartition partition; + + public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) { + this.key = key; + this.data = data; + this.offset = offset; + this.partition = partition; + } + + @Override + public T getMessage() { + return this.data; + } + + @Override + public String getKey() { + return this.key; + } + + public Offset getOffset() { + return this.offset; + } + + public SystemStreamPartition getSystemStreamPartition() { + return this.partition; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java new file mode 100644 index 0000000..361972e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.TestMessageEnvelope; +import org.apache.samza.operators.TestOutputMessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.hamcrest.core.IsEqual; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class TestOperatorImpl { + + TestMessageEnvelope curInputMsg; + MessageCollector curCollector; + TaskCoordinator curCoordinator; + + @Test + public void testSubscribers() { + this.curInputMsg = null; + this.curCollector = null; + this.curCoordinator = null; + OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() { + @Override + public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { + TestOperatorImpl.this.curInputMsg = message; + TestOperatorImpl.this.curCollector = collector; + TestOperatorImpl.this.curCoordinator = coordinator; + } + }; + // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() + OperatorImpl mockSub = mock(OperatorImpl.class); + opImpl.registerNextOperator(mockSub); + TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class); + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.propagateResult(xOutput, mockCollector, mockCoordinator); + verify(mockSub, times(1)).onNext( + argThat(new IsEqual<>(xOutput)), + argThat(new IsEqual<>(mockCollector)), + argThat(new IsEqual<>(mockCoordinator)) + ); + // verify onNext() is invoked correctly + TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); + opImpl.onNext(mockInput, mockCollector, mockCoordinator); + assertEquals(mockInput, this.curInputMsg); + assertEquals(mockCollector, this.curCollector); + assertEquals(mockCoordinator, this.curCoordinator); + } +}
