Repository: samza Updated Branches: refs/heads/master 8b3fe5d26 -> ad80cf9f1
SAMZA-1109: Updated High Level API serde impl with Yi's feedback nickpan47 for review. Author: Prateek Maheshwari <[email protected]> Reviewers: "Jagadish Venkatraman <[email protected]>" Closes #310 from prateekm/serde-updates Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad80cf9f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad80cf9f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad80cf9f Branch: refs/heads/master Commit: ad80cf9f1be427878849a61d77cf1a76381e7642 Parents: 8b3fe5d Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 4 15:28:37 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 4 15:28:37 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/execution/JobNode.java | 2 +- .../apache/samza/operators/StreamGraphImpl.java | 33 +++++++++++++++----- 2 files changed, 27 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 7ff43ed..0368829 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -141,7 +141,7 @@ public class JobNode { String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); - // Disallow user specified job inputs/outputs. This info comes strictly from the pipeline. + // Disallow user specified job inputs/outputs. This info comes strictly from the user application. Map<String, String> allowedConfigs = new HashMap<>(config); if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) { log.warn("Specifying task inputs in configuration is not allowed with Fluent API. " http://git-wip-us.apache.org/repos/asf/samza/blob/ad80cf9f/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 45378c7..a02ed3e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -80,15 +80,24 @@ public class StreamGraphImpl implements StreamGraph { @Override public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { + StreamSpec streamSpec = runner.getStreamSpec(streamId); Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); - Preconditions.checkState(!inputOperators.containsKey(runner.getStreamSpec(streamId)), + Preconditions.checkState(!inputOperators.containsKey(streamSpec), "getInputStream must not be called multiple times with the same streamId: " + streamId); - StreamSpec streamSpec = runner.getStreamSpec(streamId); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - boolean isKeyedInput = serde instanceof KVSerde; + if (outputStreams.containsKey(streamSpec)) { + OutputStreamImpl outputStream = outputStreams.get(streamSpec); + Serde keySerde = outputStream.getKeySerde(); + Serde valueSerde = outputStream.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; inputOperators.put(streamSpec, - new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyedInput, this.getNextOpId())); + new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId())); return new MessageStreamImpl<>(this, inputOperators.get(streamSpec)); } @@ -99,14 +108,24 @@ public class StreamGraphImpl implements StreamGraph { @Override public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { + StreamSpec streamSpec = runner.getStreamSpec(streamId); Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); - Preconditions.checkState(!outputStreams.containsKey(runner.getStreamSpec(streamId)), + Preconditions.checkState(!outputStreams.containsKey(streamSpec), "getOutputStream must not be called multiple times with the same streamId: " + streamId); - StreamSpec streamSpec = runner.getStreamSpec(streamId); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + if (inputOperators.containsKey(streamSpec)) { + InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec); + Serde keySerde = inputOperatorSpec.getKeySerde(); + Serde valueSerde = inputOperatorSpec.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; outputStreams.put(streamSpec, - new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), serde instanceof KVSerde)); + new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); return outputStreams.get(streamSpec); }
