SAMZA-1454: Globally unique and user settable IDs for stateful operators Author: Prateek Maheshwari <[email protected]>
Reviewers: Jagadish Venkatraman <[email protected]>, Yi Pan <[email protected]> Closes #324 from prateekm/operator-id-uniqueness Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1296c7ff Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1296c7ff Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1296c7ff Branch: refs/heads/master Commit: 1296c7ff91daff4abeac392cec53600f7c5cb427 Parents: 711dd8d Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 25 17:18:36 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 25 17:18:36 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/operators/MessageStream.java | 41 +++++++---- .../samza/execution/JobGraphJsonGenerator.java | 4 +- .../samza/operators/MessageStreamImpl.java | 44 +++++++----- .../apache/samza/operators/StreamGraphImpl.java | 74 +++++++++++++++----- .../samza/operators/impl/OperatorImpl.java | 32 ++++----- .../samza/operators/impl/OperatorImplGraph.java | 20 +++--- .../operators/impl/PartialJoinOperatorImpl.java | 32 ++++++--- .../operators/impl/WindowOperatorImpl.java | 3 +- .../samza/operators/spec/InputOperatorSpec.java | 2 +- .../samza/operators/spec/JoinOperatorSpec.java | 14 ++-- .../samza/operators/spec/OperatorSpec.java | 14 +--- .../samza/operators/spec/OperatorSpecs.java | 36 +++++++--- .../operators/spec/OutputOperatorSpec.java | 2 +- .../operators/spec/PartitionByOperatorSpec.java | 2 +- .../samza/operators/spec/SinkOperatorSpec.java | 2 +- .../operators/spec/StreamOperatorSpec.java | 2 +- .../operators/spec/WindowOperatorSpec.java | 4 +- .../samza/example/KeyValueStoreExample.java | 2 +- .../samza/example/OrderShipmentJoinExample.java | 2 +- .../samza/example/PageViewCounterExample.java | 2 +- .../samza/example/RepartitionExample.java | 5 +- .../org/apache/samza/example/WindowExample.java | 5 +- .../samza/execution/TestExecutionPlanner.java | 44 ++++++------ .../execution/TestJobGraphJsonGenerator.java | 16 ++--- .../org/apache/samza/execution/TestJobNode.java | 34 +++++---- .../samza/operators/TestJoinOperator.java | 33 +++++++-- .../samza/operators/TestMessageStreamImpl.java | 26 +++---- .../samza/operators/TestStreamGraphImpl.java | 61 ++++++++++------ .../samza/operators/TestWindowOperator.java | 46 ++++++++---- .../samza/operators/impl/TestOperatorImpl.java | 2 +- .../operators/impl/TestOperatorImplGraph.java | 47 ++++++++----- .../operators/spec/TestWindowOperatorSpec.java | 4 +- .../EndOfStreamIntegrationTest.java | 2 +- .../WatermarkIntegrationTest.java | 2 +- .../test/operator/RepartitionJoinWindowApp.java | 13 ++-- .../samza/test/operator/SessionWindowApp.java | 4 +- .../samza/test/operator/TumblingWindowApp.java | 3 +- 37 files changed, 425 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index c36fe1f..dcce7c8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -73,7 +73,7 @@ public interface MessageStream<M> { * should be retained in the filtered {@link MessageStream}. * * @param filterFn the predicate to filter messages from this {@link MessageStream}. - * @return the transformed {@link MessageStream} + * @return the filtered {@link MessageStream} */ MessageStream<M> filter(FilterFunction<? super M> filterFn); @@ -105,15 +105,19 @@ public interface MessageStream<M> { * <p> * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows. * <p> - * <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts. + * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID + * for any state stores and streams created by this operator (the full ID also contains the job name, job id and + * operator type). If the application logic is changed, this ID must be reused in the new operator to retain + * state from the previous version, and changed for the new operator to discard the state from the previous version. * * @param window the window to group and process messages from this {@link MessageStream} + * @param id the unique id of this operator in this application * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified, * panes are emitted per-key. * @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream} - * @return the transformed {@link MessageStream} + * @return the windowed {@link MessageStream} */ - <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window); + <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String id); /** * Joins this {@link MessageStream} with another {@link MessageStream} using the provided @@ -124,14 +128,18 @@ public interface MessageStream<M> { * <p> * Both inputs being joined must have the same number of partitions, and should be partitioned by the join key. * <p> - * <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts. + * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID + * for any state stores and streams created by this operator (the full ID also contains the job name, job id and + * operator type). If the application logic is changed, this ID must be reused in the new operator to retain + * state from the previous version, and changed for the new operator to discard the state from the previous version. * * @param otherStream the other {@link MessageStream} to be joined with * @param joinFn the function to join messages from this and the other {@link MessageStream} - * @param ttl the ttl for messages in each stream * @param keySerde the serde for the join key * @param messageSerde the serde for messages in this stream * @param otherMessageSerde the serde for messages in the other stream + * @param ttl the ttl for messages in each stream + * @param id the unique id of this operator in this application * @param <K> the type of join key * @param <OM> the type of messages in the other stream * @param <JM> the type of messages resulting from the {@code joinFn} @@ -139,7 +147,8 @@ public interface MessageStream<M> { */ <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn, - Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl); + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, + Duration ttl, String id); /** * Merges all {@code otherStreams} with this {@link MessageStream}. @@ -186,26 +195,34 @@ public interface MessageStream<M> { * configuration, if present. * Else, the number of partitions is set to to the max of number of partitions for all input and output streams * (excluding intermediate streams). + * <p> + * The {@code id} must be unique for each operator in this application. It is used as part of the unique ID + * for any state stores and streams created by this operator (the full ID also contains the job name, job id and + * operator type). If the application logic is changed, this ID must be reused in the new operator to retain + * state from the previous version, and changed for the new operator to discard the state from the previous version. * - * @param <K> the type of output key - * @param <V> the type of output value * @param keyExtractor the {@link Function} to extract the message and partition key from the input message * @param valueExtractor the {@link Function} to extract the value from the input message * @param serde the {@link KVSerde} to use for (de)serializing the key and value. + * @param id the unique id of this operator in this application + * @param <K> the type of output key + * @param <V> the type of output value * @return the repartitioned {@link MessageStream} */ <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde); + Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id); + /** - * Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde. + * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde. * * @param keyExtractor the {@link Function} to extract the message and partition key from the input message * @param valueExtractor the {@link Function} to extract the value from the input message + * @param id the unique id of this operator in this application * @param <K> the type of output key * @param <V> the type of output value * @return the repartitioned {@link MessageStream} */ <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor); + Function<? super M, ? extends V> valueExtractor, String id); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 23c9d89..03845e3 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -69,14 +69,14 @@ import org.codehaus.jackson.map.ObjectMapper; @JsonProperty("outputStreams") List<StreamJson> outputStreams; @JsonProperty("operators") - Map<Integer, Map<String, Object>> operators = new HashMap<>(); + Map<String, Map<String, Object>> operators = new HashMap<>(); } static final class StreamJson { @JsonProperty("streamId") String streamId; @JsonProperty("nextOperatorIds") - Set<Integer> nextOperatorIds = new HashSet<>(); + Set<String> nextOperatorIds = new HashSet<>(); } static final class JobNodeJson { http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index dc91d19..e6e711c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators; +import org.apache.samza.SamzaException; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; @@ -26,6 +27,7 @@ import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; @@ -72,42 +74,47 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) { - OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.MAP); + OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @Override public MessageStream<M> filter(FilterFunction<? super M> filterFn) { - OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.FILTER); + OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @Override public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) { - OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.FLAT_MAP); + OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @Override public void sink(SinkFunction<? super M> sinkFn) { - SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.SINK); + SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId); this.operatorSpec.registerNextOperatorSpec(op); } @Override public void sendTo(OutputStream<M> outputStream) { - OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec( - (OutputStreamImpl<M>) outputStream, this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.SEND_TO); + OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId); this.operatorSpec.registerNextOperatorSpec(op); } @Override - public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { + public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) { + String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId); OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec( - (WindowInternal<M, K, WV>) window, this.graph.getNextOpId()); + (WindowInternal<M, K, WV>) window, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @@ -115,11 +122,14 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn, - Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) { + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, + Duration ttl, String userDefinedId) { + if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself."); OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec(); + String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId); JoinOperatorSpec<K, M, OM, JM> joinOpSpec = OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, - keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId()); + keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId); this.operatorSpec.registerNextOperatorSpec(joinOpSpec); otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec); @@ -130,7 +140,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) { if (otherStreams.isEmpty()) return this; - StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId()); + String opId = this.graph.getNextOpId(OpCode.MERGE); + StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId); this.operatorSpec.registerNextOperatorSpec(opSpec); otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec)); return new MessageStreamImpl<>(this.graph, opSpec); @@ -138,10 +149,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde) { - int opId = this.graph.getNextOpId(); - String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId); - IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opName, serde); + Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) { + String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId); + IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde); PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec( intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId); @@ -151,8 +161,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor) { - return partitionBy(keyExtractor, valueExtractor, null); + Function<? super M, ? extends V> valueExtractor, String userDefinedId) { + return partitionBy(keyExtractor, valueExtractor, null, userDefinedId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index a02ed3e..936cb3a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -19,10 +19,14 @@ package org.apache.samza.operators; import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec.OpCode; +import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.runtime.ApplicationRunner; @@ -48,18 +52,19 @@ import java.util.stream.Collectors; public class StreamGraphImpl implements StreamGraph { private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class); - /** - * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph. - * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}. - */ - private int opId = 0; - // We use a LHM for deterministic order in initializing and closing operators. private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); private final ApplicationRunner runner; private final Config config; + /** + * The 0-based position of the next operator in the graph. + * Part of the unique ID for each OperatorSpec in the graph. + * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}. + */ + private int nextOpNum = 0; + private final Set<String> operatorIds = new HashSet<>(); private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde()); private ContextManager contextManager = null; @@ -81,6 +86,7 @@ public class StreamGraphImpl implements StreamGraph { @Override public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { StreamSpec streamSpec = runner.getStreamSpec(streamId); + Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); Preconditions.checkState(!inputOperators.containsKey(streamSpec), "getInputStream must not be called multiple times with the same streamId: " + streamId); @@ -96,8 +102,11 @@ public class StreamGraphImpl implements StreamGraph { } boolean isKeyed = serde instanceof KVSerde; + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + isKeyed, this.getNextOpId(OpCode.INPUT, null)); inputOperators.put(streamSpec, - new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId())); + inputOperatorSpec); return new MessageStreamImpl<>(this, inputOperators.get(streamSpec)); } @@ -109,6 +118,7 @@ public class StreamGraphImpl implements StreamGraph { @Override public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { StreamSpec streamSpec = runner.getStreamSpec(streamId); + Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); Preconditions.checkState(!outputStreams.containsKey(streamSpec), "getOutputStream must not be called multiple times with the same streamId: " + streamId); @@ -144,20 +154,15 @@ public class StreamGraphImpl implements StreamGraph { * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. * An intermediate {@link MessageStream} is both an output and an input stream. * - * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the - * logical streamId. + * @param streamId the id of the stream to be created. * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde * is used. * @param <M> the type of messages in the intermediate {@link MessageStream} * @return the intermediate {@link MessageStreamImpl} */ - <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamName, Serde<M> serde) { - String streamId = String.format("%s-%s-%s", - config.get(JobConfig.JOB_NAME()), - config.get(JobConfig.JOB_ID(), "1"), - streamName); + <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) { StreamSpec streamSpec = runner.getStreamSpec(streamId); - + Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec), "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); @@ -168,8 +173,10 @@ public class StreamGraphImpl implements StreamGraph { boolean isKeyed = serde instanceof KVSerde; KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - inputOperators.put(streamSpec, - new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId())); + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + isKeyed, this.getNextOpId(OpCode.INPUT, null)); + inputOperators.put(streamSpec, inputOperatorSpec); outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec)); } @@ -186,8 +193,37 @@ public class StreamGraphImpl implements StreamGraph { return this.contextManager; } - /* package private */ int getNextOpId() { - return this.opId++; + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-(userDefinedId|nextOpNum); + * + * @param opCode the {@link OpCode} of the next operator + * @param userDefinedId the optional user-provided name of the next operator or null + * @return the unique ID for the next operator in the graph + */ + /* package private */ String getNextOpId(OpCode opCode, String userDefinedId) { + String nextOpId = String.format("%s-%s-%s-%s", + config.get(JobConfig.JOB_NAME()), + config.get(JobConfig.JOB_ID(), "1"), + opCode.name().toLowerCase(), + StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum)); + if (!operatorIds.add(nextOpId)) { + throw new SamzaException( + String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId)); + } + nextOpNum++; + return nextOpId; + } + + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-nextOpNum; + * + * @param opCode the {@link OpCode} of the next operator + * @return the unique ID for the next operator in the graph + */ + /* package private */ String getNextOpId(OpCode opCode) { + return getNextOpId(opCode, null); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 0c50630..f5a2624 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -81,14 +81,14 @@ public abstract class OperatorImpl<M, RM> { * @param context the {@link TaskContext} for the task */ public final void init(Config config, TaskContext context) { - String opName = getOperatorName(); + String opId = getOpImplId(); if (initialized) { - throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName)); + throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opId)); } if (closed) { - throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opName)); + throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opId)); } this.highResClock = createHighResClock(config); @@ -96,9 +96,9 @@ public abstract class OperatorImpl<M, RM> { prevOperators = new HashSet<>(); inputStreams = new HashSet<>(); MetricsRegistry metricsRegistry = context.getMetricsRegistry(); - this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages"); - this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns"); - this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns"); + this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opId + "-messages"); + this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns"); + this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns"); this.taskName = context.getTaskName(); TaskContextImpl taskContext = (TaskContextImpl) context; @@ -127,7 +127,7 @@ public abstract class OperatorImpl<M, RM> { if (!initialized) { throw new IllegalStateException( String.format("Attempted to register next operator before initializing operator %s.", - getOperatorName())); + getOpImplId())); } this.registeredOperators.add(nextOperator); nextOperator.registerPrevOperator(this); @@ -163,7 +163,7 @@ public abstract class OperatorImpl<M, RM> { String.format("Error applying operator %s (created at %s) to its input message. " + "Expected input message to be of type %s, but found it to be of type %s. " + "Are Serdes for the inputs to this operator configured correctly?", - getOperatorName(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e); + getOpImplId(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e); } long endNs = this.highResClock.nanoTime(); @@ -317,7 +317,7 @@ public abstract class OperatorImpl<M, RM> { if (inputWatermark < inputWatermarkMin) { // advance the watermark time of this operator inputWatermark = inputWatermarkMin; - LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOperatorName()); + LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOpImplId()); final Long outputWm; WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn(); @@ -340,7 +340,7 @@ public abstract class OperatorImpl<M, RM> { if (outputWatermark < outputWm) { // advance the watermark outputWatermark = outputWm; - LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOperatorName()); + LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOpImplId()); this.registeredOperators.forEach(op -> op.onWatermark(outputWatermark, collector, coordinator)); } else if (outputWatermark > outputWm) { LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", outputWm, outputWatermark); @@ -375,7 +375,7 @@ public abstract class OperatorImpl<M, RM> { public void close() { if (closed) { throw new IllegalStateException( - String.format("Attempted to close Operator %s more than once.", getOperatorSpec().getOpName())); + String.format("Attempted to close Operator %s more than once.", getOpImplId())); } handleClose(); closed = true; @@ -391,16 +391,16 @@ public abstract class OperatorImpl<M, RM> { protected abstract OperatorSpec<M, RM> getOperatorSpec(); /** - * Get the unique name for this {@link OperatorImpl} in the DAG. + * Get the unique ID for this {@link OperatorImpl} in the DAG. * * Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are * 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an - * implementation specific name, e.g., for use in metrics. + * implementation specific id, e.g., for use in metrics. * - * @return the unique name for this {@link OperatorImpl} in the DAG + * @return the unique ID for this {@link OperatorImpl} in the DAG */ - protected String getOperatorName() { - return getOperatorSpec().getOpName(); + protected String getOpImplId() { + return getOperatorSpec().getOpId(); } private HighResolutionClock createHighResClock(Config config) { http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 808ddbf..49b29c8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -61,7 +61,7 @@ public class OperatorImplGraph { private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class); /** - * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating + * A mapping from operator IDs to their {@link OperatorImpl}s in this graph. Used to avoid creating * multiple {@link OperatorImpl}s for an {@link OperatorSpec} when it's reached from different * {@link OperatorSpec}s during DAG traversals (e.g., for the merge operator). * We use a LHM for deterministic ordering in initializing and closing operators. @@ -74,11 +74,11 @@ public class OperatorImplGraph { private final Map<SystemStream, InputOperatorImpl> inputOperators = new HashMap<>(); /** - * A mapping from {@link JoinOperatorSpec}s to their two {@link PartialJoinFunction}s. Used to associate + * A mapping from {@link JoinOperatorSpec} IDs to their two {@link PartialJoinFunction}s. Used to associate * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're * reached from different {@link OperatorSpec} during DAG traversals. */ - private final Map<Integer, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>(); + private final Map<String, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>(); private final Clock clock; @@ -155,13 +155,17 @@ public class OperatorImplGraph { */ OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, SystemStream inputStream, Config config, TaskContext context) { - if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) { + if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) { // Either this is the first time we've seen this operatorSpec, or this is a join operator spec // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG. OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context); operatorImpl.init(config, context); operatorImpl.registerInputStream(inputStream); - operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl); + + // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl). + // This is currently OK since we don't need to look up a partial join operator impl again during traversal + // (a join cannot have a cycle). + operatorImpls.put(operatorImpl.getOpImplId(), operatorImpl); Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { @@ -172,7 +176,7 @@ public class OperatorImplGraph { } else { // the implementation corresponding to operatorSpec has already been instantiated // and registered, so we do not need to traverse the DAG further. - return operatorImpls.get(operatorSpec.getOpName()); + return operatorImpls.get(operatorSpec.getOpId()); } } @@ -244,7 +248,7 @@ public class OperatorImplGraph { @Override public void init(Config config, TaskContext context) { - String leftStoreName = joinOpSpec.getLeftOpName(); + String leftStoreName = joinOpSpec.getLeftOpId(); leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName); // user-defined joinFn should only be initialized once, so we do it only in left partial join function. @@ -276,7 +280,7 @@ public class OperatorImplGraph { @Override public void init(Config config, TaskContext context) { - String rightStoreName = joinOpSpec.getRightOpName(); + String rightStoreName = joinOpSpec.getRightOpId(); rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName); // user-defined joinFn should only be initialized once, http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index e976a43..90a71a0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -18,11 +18,13 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -68,13 +70,21 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> { @Override public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { - K key = thisPartialJoinFn.getKey(message); - thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis())); - TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key); - long now = clock.currentTimeMillis(); - if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) { - JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue()); - return Collections.singletonList(joinResult); + try { + KeyValueStore<K, TimestampedValue<M>> thisState = thisPartialJoinFn.getState(); + KeyValueStore<K, TimestampedValue<OM>> otherState = otherPartialJoinFn.getState(); + + K key = thisPartialJoinFn.getKey(message); + thisState.put(key, new TimestampedValue<>(message, clock.currentTimeMillis())); + TimestampedValue<OM> otherMessage = otherState.get(key); + + long now = clock.currentTimeMillis(); + if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) { + JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue()); + return Collections.singletonList(joinResult); + } + } catch (Exception e) { + throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e); } return Collections.emptyList(); } @@ -89,13 +99,13 @@ class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> { } /** - * The name for this {@link PartialJoinOperatorImpl} that includes information about which + * The ID for this {@link PartialJoinOperatorImpl} that includes information about which * side of the join it is for. * - * @return the {@link PartialJoinOperatorImpl} name. + * @return the {@link PartialJoinOperatorImpl} ID. */ @Override - protected String getOperatorName() { - return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName(); + protected String getOpImplId() { + return isLeftSide ? joinOpSpec.getLeftOpId() : joinOpSpec.getRightOpId(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 42fe46a..e0b1240 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -112,7 +112,8 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje protected void handleInit(Config config, TaskContext context) { WindowInternal<M, K, Object> window = windowOpSpec.getWindow(); - KeyValueStore<TimeSeriesKey<K>, Object> store = (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpName()); + KeyValueStore<TimeSeriesKey<K>, Object> store = + (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId()); // For aggregating windows, we use the store in over-write mode since we only retain the aggregated // value. Else, we use the store in append-mode. http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 3c66ee6..4c0687a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -38,7 +38,7 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // private final boolean isKeyedInput; public InputOperatorSpec(StreamSpec streamSpec, - Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, int opId) { + Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, String opId) { super(OpCode.INPUT, opId); this.streamSpec = streamSpec; this.keySerde = keySerde; http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 3f99280..c730bca 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -60,7 +60,7 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp */ JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, - long ttlMs, int opId) { + long ttlMs, String opId) { super(OpCode.JOIN, opId); this.leftInputOpSpec = leftInputOpSpec; this.rightInputOpSpec = rightInputOpSpec; @@ -74,8 +74,8 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp @Override public Collection<StoreDescriptor> getStoreDescriptors() { String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; - String leftStoreName = getLeftOpName(); - String rightStoreName = getRightOpName(); + String leftStoreName = getLeftOpId(); + String rightStoreName = getRightOpId(); Map<String, String> leftStoreCustomProps = ImmutableMap.of( String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs), String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete", @@ -105,12 +105,12 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp return rightInputOpSpec; } - public String getLeftOpName() { - return this.getOpName() + "-L"; + public String getLeftOpId() { + return this.getOpId() + "-L"; } - public String getRightOpName() { - return this.getOpName() + "-R"; + public String getRightOpId() { + return this.getOpId() + "-R"; } public JoinFunction<K, M, OM, JM> getJoinFn() { http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 71a9897..17f1b49 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -52,7 +52,7 @@ public abstract class OperatorSpec<M, OM> { OUTPUT } - private final int opId; + private final String opId; private final OpCode opCode; private StackTraceElement[] creationStackTrace; @@ -63,7 +63,7 @@ public abstract class OperatorSpec<M, OM> { */ private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>(); - public OperatorSpec(OpCode opCode, int opId) { + public OperatorSpec(OpCode opCode, String opId) { this.opCode = opCode; this.opId = opId; this.creationStackTrace = Thread.currentThread().getStackTrace(); @@ -93,7 +93,7 @@ public abstract class OperatorSpec<M, OM> { * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}. * @return the unique operator ID */ - public final int getOpId() { + public final String getOpId() { return this.opId; } @@ -128,13 +128,5 @@ public abstract class OperatorSpec<M, OM> { return String.format("%s:%s", element.getFileName(), element.getLineNumber()); } - /** - * Get the name for this operator based on its opCode and opId. - * @return the name for this operator - */ - public final String getOpName() { - return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId()); - } - abstract public WatermarkFunction getWatermarkFn(); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 8b2b177..1b3b8aa 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -28,6 +28,7 @@ import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.Serde; +import org.apache.samza.system.StreamSpec; import org.apache.samza.task.TaskContext; import java.util.ArrayList; @@ -43,6 +44,23 @@ public class OperatorSpecs { private OperatorSpecs() {} /** + * Creates an {@link InputOperatorSpec} for consuming input. + * + * @param streamSpec the stream spec for the input stream + * @param keySerde the serde for the input key + * @param valueSerde the serde for the input value + * @param isKeyed whether the input stream is keyed + * @param opId the unique ID of the operator + * @param <K> type of input key + * @param <V> type of input value + * @return the {@link InputOperatorSpec} + */ + public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec( + StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { + return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, opId); + } + + /** * Creates a {@link StreamOperatorSpec} for {@link MapFunction} * * @param mapFn the map function @@ -52,7 +70,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec( - MapFunction<? super M, ? extends OM> mapFn, int opId) { + MapFunction<? super M, ? extends OM> mapFn, String opId) { return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { @Override public Collection<OM> apply(M message) { @@ -87,7 +105,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec( - FilterFunction<? super M> filterFn, int opId) { + FilterFunction<? super M> filterFn, String opId) { return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { @Override public Collection<M> apply(M message) { @@ -122,7 +140,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} */ public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec( - FlatMapFunction<? super M, ? extends OM> flatMapFn, int opId) { + FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) { return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId); } @@ -134,7 +152,7 @@ public class OperatorSpecs { * @param <M> type of input message * @return the {@link SinkOperatorSpec} for the sink operator */ - public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) { + public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, String opId) { return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, opId); } @@ -146,7 +164,7 @@ public class OperatorSpecs { * @param <M> the type of message in the {@link OutputStreamImpl} * @return the {@link OutputOperatorSpec} for the sendTo operator */ - public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, int opId) { + public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, String opId) { return new OutputOperatorSpec<>(outputStream, opId); } @@ -164,7 +182,7 @@ public class OperatorSpecs { */ public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec( OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction, - Function<? super M, ? extends V> valueFunction, int opId) { + Function<? super M, ? extends V> valueFunction, String opId) { return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId); } @@ -180,7 +198,7 @@ public class OperatorSpecs { */ public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec( - WindowInternal<M, WK, WV> window, int opId) { + WindowInternal<M, WK, WV> window, String opId) { return new WindowOperatorSpec<>(window, opId); } @@ -203,7 +221,7 @@ public class OperatorSpecs { */ public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec( OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn, - Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) { + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, String opId) { return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, keySerde, messageSerde, otherMessageSerde, ttlMs, opId); } @@ -215,7 +233,7 @@ public class OperatorSpecs { * @param <M> the type of input message * @return the {@link StreamOperatorSpec} for the merge */ - public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(int opId) { + public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(String opId) { return new StreamOperatorSpec<>(message -> new ArrayList<M>() { { http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index 862370f..6cb4fca 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@ -39,7 +39,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> { * @param outputStream the {@link OutputStreamImpl} to send messages to * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph */ - OutputOperatorSpec(OutputStreamImpl<M> outputStream, int opId) { + OutputOperatorSpec(OutputStreamImpl<M> outputStream, String opId) { super(OpCode.SEND_TO, opId); this.outputStream = outputStream; } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index 42eeb4b..399c836 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@ -51,7 +51,7 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> { */ PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction, - Function<? super M, ? extends V> valueFunction, int opId) { + Function<? super M, ? extends V> valueFunction, String opId) { super(OpCode.PARTITION_BY, opId); this.outputStream = outputStream; this.keyFunction = keyFunction; http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 1145be8..1ca3801 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -41,7 +41,7 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> { * {@link org.apache.samza.task.TaskCoordinator}. * @param opId the unique ID of this {@link OperatorSpec} in the graph */ - SinkOperatorSpec(SinkFunction<M> sinkFn, int opId) { + SinkOperatorSpec(SinkFunction<M> sinkFn, String opId) { super(OpCode.SINK, opId); this.sinkFn = sinkFn; } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index aace2e2..b1e29c6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -39,7 +39,7 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} * @param opId the unique ID for this {@link StreamOperatorSpec} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, int opId) { + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, String opId) { super(opCode, opId); this.transformFn = transformFn; } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 3c8879f..06a4f4b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -58,7 +58,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK * @param window the window function * @param opId auto-generated unique ID of this operator */ - WindowOperatorSpec(WindowInternal<M, WK, WV> window, int opId) { + WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) { super(OpCode.WINDOW, opId); this.window = window; } @@ -124,7 +124,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK @Override public Collection<StoreDescriptor> getStoreDescriptors() { - String storeName = getOpName(); + String storeName = getOpId(); String storeFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; Serde storeKeySerde = new TimeSeriesKeySerde<>(window.getKeySerde()); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java index c75608f..f9e0a3a 100644 --- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -54,7 +54,7 @@ public class KeyValueStoreExample implements StreamApplication { pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class))) + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") .map(KV::getValue) .flatMap(new MyStatsCounter()) .map(stats -> KV.of(stats.memberId, stats)) http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java index df393b0..ff785d9 100644 --- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -51,7 +51,7 @@ public class OrderShipmentJoinExample implements StreamApplication { orders .join(shipments, new MyJoinFunction(), new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), - Duration.ofMinutes(1)) + Duration.ofMinutes(1), "join") .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) .sendTo(fulfilledOrders); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index c40de7b..846b9f8 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -56,7 +56,7 @@ public class PageViewCounterExample implements StreamApplication { pageViewEvents .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null) .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) - .setAccumulationMode(AccumulationMode.DISCARDING)) + .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow") .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane))) .sendTo(pageViewEventPerMemberStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index c403406..c9bcc45 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -49,8 +49,9 @@ public class RepartitionExample implements StreamApplication { pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class))) - .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null)) + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") + .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), + "window") .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) .sendTo(pageViewEventPerMember); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java index 9381e49..3c37c31 100644 --- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java @@ -54,8 +54,9 @@ public class WindowExample implements StreamApplication { // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive // for 1 minute. inputStream - .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, null) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1))))) + .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde()) + .setLateTrigger(Triggers.any(Triggers.count(30000), + Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window") .map(WindowPane::getMessage) .sendTo(outputStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index ab20bba..0a3e9c8 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -111,7 +111,7 @@ public class TestExecutionPlanner { MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1"); OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); input1 - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p1") .map(kv -> kv) .sendTo(output1); return streamGraph; @@ -136,23 +136,23 @@ public class TestExecutionPlanner { .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); messageStream1 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") .sendTo(output1); messageStream3 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") .sendTo(output2); return streamGraph; @@ -166,35 +166,37 @@ public class TestExecutionPlanner { .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); messageStream1.map(m -> m) - .filter(m -> true) - .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class))); + .filter(m->true) + .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), + mock(Serde.class), mock(Serde.class)), "w1"); messageStream2.map(m -> m) - .filter(m -> true) - .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class))); + .filter(m->true) + .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), + mock(Serde.class), mock(Serde.class)), "w2"); messageStream1 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1") .sendTo(output1); messageStream3 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2") .sendTo(output2); messageStream3 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3") .sendTo(output2); return streamGraph; @@ -242,12 +244,12 @@ public class TestExecutionPlanner { when(runner.getStreamSpec("output2")).thenReturn(output2); // intermediate streams used in tests - when(runner.getStreamSpec("test-app-1-partition_by-1")) - .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system")); - when(runner.getStreamSpec("test-app-1-partition_by-3")) - .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system")); - when(runner.getStreamSpec("test-app-1-partition_by-8")) - .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-p1")) + .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-p2")) + .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-p3")) + .thenReturn(new StreamSpec("test-app-1-partition_by-p3", "test-app-1-partition_by-p3", "default-system")); } @Test @@ -429,7 +431,7 @@ public class TestExecutionPlanner { MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4"); OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); - input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1); + input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1); JobGraph jobGraph = (JobGraph) planner.plan(streamGraph); // the partitions should be the same as input1 http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index 10c4aa3..ba5c922 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -85,10 +85,10 @@ public class TestJobGraphJsonGenerator { when(runner.getStreamSpec("output2")).thenReturn(output2); // intermediate streams used in tests - when(runner.getStreamSpec("test-app-1-partition_by-3")) - .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system")); - when(runner.getStreamSpec("test-app-1-partition_by-8")) - .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-p1")) + .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system")); + when(runner.getStreamSpec("test-app-1-partition_by-p2")) + .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system")); // set up external partition count Map<String, Integer> system1Map = new HashMap<>(); @@ -113,24 +113,24 @@ public class TestJobGraphJsonGenerator { .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) - .partitionBy(m -> m.key, m -> m.value) + .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2"); messageStream1 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") .sendTo(outputStream1); messageStream2.sink((message, collector, coordinator) -> { }); messageStream3 .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") .sendTo(outputStream2); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java index f6ebaf9..53e8bf6 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@ -20,6 +20,7 @@ package org.apache.samza.execution; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.operators.KV; @@ -45,8 +46,11 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestJobNode { @@ -56,25 +60,31 @@ public class TestJobNode { StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system"); StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system"); StreamSpec outputSpec = new StreamSpec("output", "output", "output-system"); - StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system"); + StreamSpec partitionBySpec = + new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system"); doReturn(input1Spec).when(mockRunner).getStreamSpec("input1"); doReturn(input2Spec).when(mockRunner).getStreamSpec("input2"); doReturn(outputSpec).when(mockRunner).getStreamSpec("output"); - doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2"); + doReturn(partitionBySpec).when(mockRunner).getStreamSpec("jobName-jobId-partition_by-p1"); - StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + Config mockConfig = mock(Config.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); + + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1"); MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2"); OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output"); JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); input1 - .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value) + .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value) .join(input2.map(kv -> kv.value), mockJoinFn, - new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1)) + new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), + Duration.ofHours(1), "j1") .sendTo(output); - JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class)); + JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mockConfig); Config config = new MapConfig(); StreamEdge input1Edge = new StreamEdge(input1Spec, config); StreamEdge input2Edge = new StreamEdge(input2Spec, config); @@ -133,8 +143,8 @@ public class TestJobNode { assertTrue("Serialized output msg serde should be a StringSerde", outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde"); - String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde"); + String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); + String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde"); assertTrue("Serialized serdes should contain intermediate stream key serde", deserializedSerdes.containsKey(partitionByKeySerde)); assertTrue("Serialized intermediate stream key serde should be a StringSerde", @@ -145,8 +155,8 @@ public class TestJobNode { "Serialized intermediate stream msg serde should be a StringSerde", partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde"); - String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde"); + String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde"); + String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde"); assertTrue("Serialized serdes should contain left join store key serde", deserializedSerdes.containsKey(leftJoinStoreKeySerde)); assertTrue("Serialized left join store key serde should be a StringSerde", @@ -156,8 +166,8 @@ public class TestJobNode { assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde", leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); - String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde"); - String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde"); + String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde"); + String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde"); assertTrue("Serialized serdes should contain right join store key serde", deserializedSerdes.containsKey(rightJoinStoreKeySerde)); assertTrue("Serialized right join store key serde should be a StringSerde", http://git-wip-us.apache.org/repos/asf/samza/blob/1296c7ff/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 09fb56a..dac4e94 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -20,8 +20,10 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; @@ -51,6 +53,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -77,6 +80,22 @@ public class TestJoinOperator { assertEquals(110, outputSum); } + @Test(expected = SamzaException.class) + public void joinWithSelfThrowsException() throws Exception { + StreamApplication app = new StreamApplication() { + @Override + public void init(StreamGraph graph, Config config) { + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde); + + inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); + } + }; + + createStreamOperatorTask(new SystemClock(), app); // should throw an exception + } + @Test public void joinFnInitAndClose() throws Exception { TestJoinFunction joinFn = new TestJoinFunction(); @@ -277,10 +296,14 @@ public class TestJoinOperator { // need to return different stores for left and right side IntegerSerde integerSerde = new IntegerSerde(); TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde)); - when(taskContext.getStore(eq("join-2-L"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); - when(taskContext.getStore(eq("join-2-R"))).thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); + when(taskContext.getStore(eq("jobName-jobId-join-j1-L"))) + .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); + when(taskContext.getStore(eq("jobName-jobId-join-j1-R"))) + .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); Config config = mock(Config.class); + when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock); sot.init(config, taskContext); @@ -304,10 +327,8 @@ public class TestJoinOperator { SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream - .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL) - .sink((message, messageCollector, taskCoordinator) -> { - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); + .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") + .sink((m, mc, tc) -> mc.send(new OutgoingMessageEnvelope(outputSystemStream, m))); } }
