SAMZA-1092: replace stream spec in fluent API Replaced the StreamSpec class w/ the new one from SAMZA-1075.
Author: Yi Pan (Data Infrastructure) <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #58 from nickpan47/replace-stream-spec and squashes the following commits: 761ebb5 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in system package df953c2 [Yi Pan (Data Infrastructure)] SAMZA-1092: fix unit test 71331d8 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class 2fb19e9 [Yi Pan (Data Infrastructure)] SAMZA-1092: replace StreamSpec w/ new class in fluent API ed3ad8e [Yi Pan (Data Infrastructure)] WIP: replace stream spec in fluent API Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93c82f3d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93c82f3d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93c82f3d Branch: refs/heads/samza-fluent-api-v1 Commit: 93c82f3de4f86d18b363ffb84bdaa407d4f2cac5 Parents: d39bce9 Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Thu Feb 23 12:48:56 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Mon Feb 27 12:29:45 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/operators/StreamGraph.java | 2 +- .../org/apache/samza/operators/StreamSpec.java | 46 ----------- .../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++----------- .../samza/example/KeyValueStoreExample.java | 24 +----- .../samza/example/NoContextStreamExample.java | 33 +------- .../samza/example/OrderShipmentJoinExample.java | 35 +-------- .../samza/example/PageViewCounterExample.java | 23 +----- .../samza/example/RepartitionExample.java | 23 +----- .../samza/example/TestBroadcastExample.java | 15 +--- .../apache/samza/example/TestJoinExample.java | 26 ++---- .../apache/samza/example/TestWindowExample.java | 15 +--- .../operators/impl/TestStreamOperatorImpl.java | 1 + 12 files changed, 69 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index abc9861..30c4576 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -21,10 +21,10 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.serializers.Serde; +import org.apache.samza.system.StreamSpec; import java.util.Map; - /** * Job-level programming interface to create an operator DAG and run in various different runtime environments. */ http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java deleted file mode 100644 index c8a5e8d..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.system.SystemStream; - -import java.util.Properties; - - -/** - * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin} - * to create a {@link SystemStream} - */ [email protected] -public interface StreamSpec { - /** - * Get the {@link SystemStream} - * - * @return {@link SystemStream} object - */ - SystemStream getSystemStream(); - - /** - * Get the physical properties of the {@link SystemStream} - * - * @return the properties of this stream - */ - Properties getProperties(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index dca3469..353f455 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -18,12 +18,12 @@ */ package org.apache.samza.operators; -import java.util.Properties; import java.util.function.Function; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; @@ -44,6 +44,9 @@ public class StreamGraphImpl implements StreamGraph { */ private int opId = 0; + // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed + // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment} + // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change. private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> { final StreamSpec spec; final Serde<K> keySerde; @@ -83,7 +86,7 @@ public class StreamGraphImpl implements StreamGraph { // TODO: need to find a way to directly pass in the serde class names // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), // message.getKey(), message.getKey(), message.getMessage())); - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage())); }; } } @@ -112,10 +115,10 @@ public class StreamGraphImpl implements StreamGraph { // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), // message.getKey(), message.getKey(), message.getMessage())); if (this.parKeyFn == null) { - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage())); } else { // apply partition key function - mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage())); + mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage())); } }; } @@ -124,17 +127,17 @@ public class StreamGraphImpl implements StreamGraph { /** * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl} */ - private final Map<SystemStream, MessageStream> inStreams = new HashMap<>(); - private final Map<SystemStream, OutputStream> outStreams = new HashMap<>(); + private final Map<String, MessageStream> inStreams = new HashMap<>(); + private final Map<String, OutputStream> outStreams = new HashMap<>(); private ContextManager contextManager = new ContextManager() { }; @Override public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + if (!this.inStreams.containsKey(streamSpec.getId())) { + this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); } - return this.inStreams.get(streamSpec.getSystemStream()); + return this.inStreams.get(streamSpec.getId()); } /** @@ -146,10 +149,10 @@ public class StreamGraphImpl implements StreamGraph { */ @Override public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + if (!this.outStreams.containsKey(streamSpec.getId())) { + this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); } - return this.outStreams.get(streamSpec.getSystemStream()); + return this.outStreams.get(streamSpec.getId()); } /** @@ -161,12 +164,12 @@ public class StreamGraphImpl implements StreamGraph { */ @Override public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde)); + if (!this.inStreams.containsKey(streamSpec.getId())) { + this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde)); } - IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream()); - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId()); + if (!this.outStreams.containsKey(streamSpec.getId())) { + this.outStreams.putIfAbsent(streamSpec.getId(), intStream); } return intStream; } @@ -200,12 +203,15 @@ public class StreamGraphImpl implements StreamGraph { /** * Helper method to be get the input stream via {@link SystemStream} * - * @param systemStream the {@link SystemStream} + * @param sstream the {@link SystemStream} * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream} */ - public MessageStreamImpl getInputStream(SystemStream systemStream) { - if (this.inStreams.containsKey(systemStream)) { - return (MessageStreamImpl) this.inStreams.get(systemStream); + public MessageStreamImpl getInputStream(SystemStream sstream) { + for(MessageStream entry: this.inStreams.values()) { + if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() && + ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) { + return (MessageStreamImpl) entry; + } } return null; } @@ -217,13 +223,6 @@ public class StreamGraphImpl implements StreamGraph { return null; } - <M> MessageStream<M> getIntStream(OutputStream<M> outStream) { - if (this.inStreams.containsValue(outStream)) { - return (MessageStream<M>) outStream; - } - return null; - } - /** * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method. * @@ -234,27 +233,21 @@ public class StreamGraphImpl implements StreamGraph { */ <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) { // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec} - StreamSpec streamSpec = new StreamSpec() { - @Override - public SystemStream getSystemStream() { - // TODO: should auto-generate intermedaite stream name here - return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId)); - } + StreamSpec streamSpec = this.createIntStreamSpec(); - @Override - public Properties getProperties() { - return null; - } - }; - - if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { - this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); + if (!this.inStreams.containsKey(streamSpec.getId())) { + this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); } - IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream()); - if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { - this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId()); + if (!this.outStreams.containsKey(streamSpec.getId())) { + this.outStreams.putIfAbsent(streamSpec.getId(), intStream); } return intStream; } + private StreamSpec createIntStreamSpec() { + // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically + return null; + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java index 85ebc6c..4a0681e 100644 --- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -32,12 +32,10 @@ import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.task.TaskContext; import org.apache.samza.util.CommandLine; -import java.util.Properties; - /** * Example code using {@link KeyValueStore} to implement event-time window @@ -113,25 +111,9 @@ public class KeyValueStoreExample implements StreamGraphBuilder { } } - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka"); - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka"); class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { String pageId; http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java index c6d2e6e..320680c 100644 --- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java @@ -28,13 +28,12 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.CommandLine; import java.util.ArrayList; import java.util.List; -import java.util.Properties; /** @@ -42,35 +41,11 @@ import java.util.Properties; */ public class NoContextStreamExample implements StreamGraphBuilder { - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "input1"); - } + StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka"); - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka"); - StreamSpec input2 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "input2"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "output"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka"); class MessageType { String joinKey; http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java index 0477066..30ce7d2 100644 --- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -23,17 +23,14 @@ import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.config.Config; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; -import java.util.Properties; - /** * Simple 2-way stream-to-stream join example @@ -71,35 +68,11 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder { standaloneEnv.run(new OrderShipmentJoinExample(), config); } - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "Orders"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec input2 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "Shipment"); - } + StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka"); - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka"); - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "FulfilledOrders"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka"); class OrderRecord implements MessageEnvelope<String, OrderRecord> { String orderId; http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index f7d8bda..fcf67a7 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -29,11 +29,10 @@ import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; import java.time.Duration; -import java.util.Properties; /** @@ -62,25 +61,9 @@ public class PageViewCounterExample implements StreamGraphBuilder { standaloneEnv.run(new PageViewCounterExample(), config); } - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka"); - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka"); class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { String pageId; http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index 6994ac4..228668c 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -27,11 +27,10 @@ import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.util.CommandLine; import java.time.Duration; -import java.util.*; /** @@ -73,25 +72,9 @@ public class RepartitionExample implements StreamGraphBuilder { standaloneEnv.run(new RepartitionExample(), config); } - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka"); - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; + StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka"); class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { String pageId; http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java index d22324b..059afce 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java @@ -22,18 +22,16 @@ package org.apache.samza.example; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import java.time.Duration; import java.util.function.BiFunction; -import java.util.Properties; import java.util.Set; @@ -70,15 +68,8 @@ public class TestBroadcastExample extends TestExampleBase { public void init(StreamGraph graph, Config config) { BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; inputs.keySet().forEach(entry -> { - MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return entry; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null).map(this::getInputMessage); + MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream( + new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage); inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java index fe6e7e7..cc53814 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java @@ -22,19 +22,18 @@ package org.apache.samza.example; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.Set; @@ -65,16 +64,9 @@ public class TestJoinExample extends TestExampleBase { public void init(StreamGraph graph, Config config) { for (SystemStream input : inputs.keySet()) { + StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem()); MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( - new StreamSpec() { - @Override public SystemStream getSystemStream() { - return input; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null).map(this::getInputMessage); + inputStreamSpec, null, null).map(this::getInputMessage); if (joinOutput == null) { joinOutput = newSource; } else { @@ -82,15 +74,9 @@ public class TestJoinExample extends TestExampleBase { } } - joinOutput.sendTo(graph.createOutStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return null; - } - - @Override public Properties getProperties() { - return null; - } - }, new StringSerde("UTF-8"), new JsonSerde<>())); + joinOutput.sendTo(graph.createOutStream( + new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"), + new StringSerde("UTF-8"), new JsonSerde<>())); } http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java index e08ca20..73f4674 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java @@ -21,18 +21,16 @@ package org.apache.samza.example; import org.apache.samza.config.Config; import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.data.Offset; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import java.time.Duration; import java.util.function.BiFunction; -import java.util.Properties; import java.util.Set; @@ -60,15 +58,8 @@ public class TestWindowExample extends TestExampleBase { @Override public void init(StreamGraph graph, Config config) { BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; - inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return source; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null). + inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream( + new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null). map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); http://git-wip-us.apache.org/repos/asf/samza/blob/93c82f3d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 010a210..0a873fd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.*; public class TestStreamOperatorImpl { @Test + @SuppressWarnings("unchecked") public void testSimpleOperator() { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
