Repository: samza
Updated Branches:
  refs/heads/master ae29a40d0 -> 3375c7116


SAMZA-1312: Add Control Messages and Intermediate Stream Serde

In this patch, we add the control message types which includes:
* EndOfStreamMessage
* WatermarkMessage

To support in-band data and control messages, we provide a wrapper serde 
(IntermediateMessageSerde) to serialize/deserialize data/control messages based 
on message type byte (first byte in the intermediate stream message). The 
format of the message is defined in SAMZA-1312. The patch integrates this serde 
with SerdeManager.

Tested in example jobs deployed locally and works as expected.

Author: Xinyu Liu <[email protected]>

Reviewers: Jagadish V <[email protected]>

Closes #207 from xinyuiscool/SAMZA-1312


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3375c711
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3375c711
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3375c711

Branch: refs/heads/master
Commit: 3375c7116ef68d845b4f96cf5d1b9291397d79ec
Parents: ae29a40
Author: Xinyu Liu <[email protected]>
Authored: Tue Jun 13 09:46:02 2017 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Tue Jun 13 09:46:02 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/execution/JobGraph.java    |  20 ++-
 .../org/apache/samza/execution/JobNode.java     |  21 ++-
 .../org/apache/samza/execution/StreamEdge.java  |  10 ++
 .../apache/samza/message/ControlMessage.java    |  52 +++++++
 .../samza/message/EndOfStreamMessage.java       |  36 +++++
 .../org/apache/samza/message/MessageType.java   |  46 ++++++
 .../apache/samza/message/WatermarkMessage.java  |  43 ++++++
 .../serializers/IntermediateMessageSerde.java   | 141 +++++++++++++++++++
 .../org/apache/samza/config/StreamConfig.scala  |  19 ++-
 .../apache/samza/container/SamzaContainer.scala |  29 +++-
 .../apache/samza/serializers/SerdeManager.scala |  55 +++++---
 .../TestIntermediateMessageSerde.java           | 131 +++++++++++++++++
 .../samza/serializers/TestSerdeManager.scala    |  62 ++++++++
 13 files changed, 638 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 35f27ab..99ee86c 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -130,7 +130,7 @@ import org.slf4j.LoggerFactory;
    * @param to the target node
    */
   void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
-    StreamEdge edge = getOrCreateStreamEdge(streamSpec);
+    StreamEdge edge = getOrCreateStreamEdge(streamSpec, true);
     edge.addSourceNode(from);
     edge.addTargetNode(to);
     from.addOutEdge(edge);
@@ -160,16 +160,32 @@ import org.slf4j.LoggerFactory;
    * @return stream edge
    */
   StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
+    return getOrCreateStreamEdge(streamSpec, false);
+  }
+
+  /**
+   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it 
does not exist.
+   * @param streamSpec  spec of the StreamEdge
+   * @param isIntermediate  boolean flag indicating whether it's an 
intermediate stream
+   * @return stream edge
+   */
+  StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean 
isIntermediate) {
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
-      edge = new StreamEdge(streamSpec);
+      edge = new StreamEdge(streamSpec, isIntermediate);
       edges.put(streamId, edge);
     }
     return edge;
   }
 
   /**
+   * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it 
does not exist.
+   * @param streamSpec spec of the StreamEdge
+   * @return stream edge
+   */
+
+  /**
    * Returns the job nodes to be executed in the topological order
    * @return unmodifiable list of {@link JobNode}
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index c42e1cc..88b24ba 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -29,12 +29,14 @@ import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,10 +122,13 @@ public class JobNode {
       }
     }
 
-    log.info("Job {} has generated configs {}", jobName, configs);
-
     configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
 
+    // write input/output streams to configs
+    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> 
addStreamConfig(edge, configs));
+
+    log.info("Job {} has generated configs {}", jobName, configs);
+
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
     // TODO: Disallow user specifying job inputs/outputs. This info comes 
strictly from the pipeline.
     return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new 
MapConfig(configs), configPrefix)));
@@ -185,6 +190,18 @@ public class JobNode {
     return scopedConfig;
   }
 
+  private static void addStreamConfig(StreamEdge edge, Map<String, String> 
config) {
+    StreamSpec spec = edge.getStreamSpec();
+    config.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
spec.getId()), spec.getSystemName());
+    config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
spec.getId()), spec.getPhysicalName());
+    if (edge.isIntermediate()) {
+      config.put(String.format(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID(), 
spec.getId()), "true");
+    }
+    spec.getConfig().forEach((property, value) -> {
+        config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
spec.getId()) + property, value);
+      });
+  }
+
   static String createId(String jobName, String jobId) {
     return String.format("%s-%s", jobName, jobId);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 9596d0f..35fde81 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -40,10 +40,16 @@ public class StreamEdge {
 
   private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
+  private final boolean isIntermediate;
 
   StreamEdge(StreamSpec streamSpec) {
+    this(streamSpec, false);
+  }
+
+  StreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
     this.streamSpec = streamSpec;
     this.name = Util.getNameFromSystemStream(getSystemStream());
+    this.isIntermediate = isIntermediate;
   }
 
   void addSourceNode(JobNode sourceNode) {
@@ -93,4 +99,8 @@ public class StreamEdge {
   void setName(String name) {
     this.name = name;
   }
+
+  boolean isIntermediate() {
+    return isIntermediate;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java 
b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
new file mode 100644
index 0000000..46bf559
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.message;
+
+/**
+ * The abstract class of all control messages, containing
+ * the task that produces the control message, the total number of producer 
tasks,
+ * and a version number.
+ */
+public abstract class ControlMessage {
+  private final String taskName;
+  private final int taskCount;
+  private int version = 1;
+
+  public ControlMessage(String taskName, int taskCount) {
+    this.taskName = taskName;
+    this.taskCount = taskCount;
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public int getTaskCount() {
+    return taskCount;
+  }
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java 
b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
new file mode 100644
index 0000000..91981a9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.message;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The EndOfStreamMessage is a control message that is sent out to next stage
+ *  once the task has consumed to the end of a bounded stream.
+ */
+public class EndOfStreamMessage extends ControlMessage {
+
+  @JsonCreator
+  public EndOfStreamMessage(@JsonProperty("task-name") String taskName,
+                            @JsonProperty("task-count") int taskCount) {
+    super(taskName, taskCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/MessageType.java 
b/samza-core/src/main/java/org/apache/samza/message/MessageType.java
new file mode 100644
index 0000000..b1199b6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/message/MessageType.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.message;
+
+/**
+ * The type of the intermediate stream message. The enum will be encoded using 
its ordinal value and
+ * put in the first byte of the serialization of intermediate message.
+ * For more details, see {@link 
org.apache.samza.serializers.IntermediateMessageSerde}
+ */
+public enum MessageType {
+  USER_MESSAGE,
+  WATERMARK,
+  END_OF_STREAM;
+
+  /**
+   * Returns the {@link MessageType} of a particular intermediate stream 
message.
+   * @param message an intermediate stream message
+   * @return type of the message
+   */
+  public static MessageType of(Object message) {
+    if (message instanceof WatermarkMessage) {
+      return WATERMARK;
+    } else if (message instanceof EndOfStreamMessage) {
+      return END_OF_STREAM;
+    } else {
+      return USER_MESSAGE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java 
b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
new file mode 100644
index 0000000..aa25742
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.message;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The WatermarkMessage is a control message that is sent out to next stage
+ *  with a watermark timestamp and the task that produces the watermark.
+ */
+public class WatermarkMessage extends ControlMessage {
+  private final long timestamp;
+
+  @JsonCreator
+  public WatermarkMessage(@JsonProperty("timestamp") long timestamp,
+                          @JsonProperty("task-name") String taskName,
+                          @JsonProperty("task-count") int taskCount) {
+    super(taskName, taskCount);
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
new file mode 100644
index 0000000..26ef92c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Arrays;
+import org.apache.samza.SamzaException;
+import org.apache.samza.message.EndOfStreamMessage;
+import org.apache.samza.message.MessageType;
+import org.apache.samza.message.WatermarkMessage;
+import org.codehaus.jackson.type.TypeReference;
+
+
+/**
+ * This class provides serialization/deserialization of the intermediate 
messages.
+ *
+ * The message format of an intermediate stream is below:
+ *
+ * IntermediateStreamMessage: {
+ *   MessageType : int8
+ *   MessageData : byte[]
+ * }
+ *
+ * MessageType: [0(UserMessage), 1(Watermark), 2(EndOfStream)]
+ * MessageData: [UserMessage/ControlMessage]
+ * ControlMessage:
+ *   Version   : int
+ *   TaskName  : string
+ *   TaskCount : int
+ *   Other Message Data (based on different types of control message)
+ *
+ * For user message, we use the user message serde.
+ * For control message, we use json serde.
+ */
+public class IntermediateMessageSerde implements Serde<Object> {
+
+  private static final class WatermarkSerde extends 
JsonSerde<WatermarkMessage> {
+    @Override
+    public WatermarkMessage fromBytes(byte[] bytes) {
+      try {
+        return mapper().readValue(new String(bytes, "UTF-8"), new 
TypeReference<WatermarkMessage>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  private static final class EndOfStreamSerde extends 
JsonSerde<EndOfStreamMessage> {
+    @Override
+    public EndOfStreamMessage fromBytes(byte[] bytes) {
+      try {
+        return mapper().readValue(new String(bytes, "UTF-8"), new 
TypeReference<EndOfStreamMessage>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  private final Serde userMessageSerde;
+  private final Serde<WatermarkMessage> watermarkSerde;
+  private final Serde<EndOfStreamMessage> eosSerde;
+
+  public IntermediateMessageSerde(Serde userMessageSerde) {
+    this.userMessageSerde = userMessageSerde;
+    this.watermarkSerde = new WatermarkSerde();
+    this.eosSerde = new EndOfStreamSerde();
+  }
+
+  @Override
+  public Object fromBytes(byte[] bytes) {
+    try {
+      final Object object;
+      final MessageType type = MessageType.values()[bytes[0]];
+      final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length);
+      switch (type) {
+        case USER_MESSAGE:
+          object = userMessageSerde.fromBytes(data);
+          break;
+        case WATERMARK:
+          object = watermarkSerde.fromBytes(data);
+          break;
+        case END_OF_STREAM:
+          object = eosSerde.fromBytes(data);
+          break;
+        default:
+          throw new UnsupportedOperationException(String.format("Message type 
%s is not supported", type.name()));
+      }
+      return object;
+
+    } catch (UnsupportedOperationException ue) {
+      throw new SamzaException(ue);
+    } catch (Exception e) {
+      // This will catch the following exceptions:
+      // 1) the first byte is not a valid type so it will cause 
ArrayOutOfBound exception
+      // 2) the first byte happens to be a valid type, but the deserialization 
fails with certain exception
+      // For these cases, we fall back to user-provided serde
+      return userMessageSerde.fromBytes(bytes);
+    }
+  }
+
+  @Override
+  public byte[] toBytes(Object object) {
+    final byte [] data;
+    final MessageType type = MessageType.of(object);
+    switch (type) {
+      case USER_MESSAGE:
+        data = userMessageSerde.toBytes(object);
+        break;
+      case WATERMARK:
+        data = watermarkSerde.toBytes((WatermarkMessage) object);
+        break;
+      case END_OF_STREAM:
+        data = eosSerde.toBytes((EndOfStreamMessage) object);
+        break;
+      default:
+        throw new SamzaException("Unknown message type: " + type.name());
+    }
+
+    final byte [] bytes = new byte[data.length + 1];
+    bytes[0] = (byte) type.ordinal();
+    System.arraycopy(data, 0, bytes, 1, data.length);
+
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 389a883..20192fb 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -36,14 +36,16 @@ object StreamConfig {
   val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default"
   val BOOTSTRAP =               SAMZA_PROPERTY + "bootstrap"
   val PRIORITY =                SAMZA_PROPERTY + "priority"
+  val IS_INTERMEDIATE =         SAMZA_PROPERTY + "intermediate"
 
   // We don't want any external dependencies on these patterns while both 
exist. Use getProperty to ensure proper values.
   private val STREAMS_PREFIX = "streams."
   private val STREAM_PREFIX = "systems.%s.streams.%s."
 
-  protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
-  protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
+  val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
+  val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
   val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
+  val IS_INTERMEDIATE_FROM_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
@@ -152,6 +154,15 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   }
 
   /**
+   * Gets the boolean flag of whether the specified streamId is an 
intermediate stream
+   * @param streamId  the identifier for the stream in the config.
+   * @return          true if the stream is intermediate
+   */
+  def getIsIntermediate(streamId: String) = {
+    getBoolean(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID format streamId, 
false)
+  }
+
+  /**
    * Gets the stream IDs of all the streams defined in the config
    * @return collection of stream IDs
    */
@@ -259,7 +270,7 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
   }
 
-  private def systemStreamToStreamId(systemStream: SystemStream): String = {
+  def systemStreamToStreamId(systemStream: SystemStream): String = {
    val streamIds = 
getStreamIdsForSystem(systemStream.getSystem).filter(streamId => 
systemStream.getStream().equals(getPhysicalName(streamId)))
     if (streamIds.size > 1) {
       throw new IllegalStateException("There was more than one stream found 
for system stream %s" format(systemStream))
@@ -276,7 +287,7 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     * A streamId is translated to a SystemStream by looking up its System and 
physicalName. It
     * will use the streamId as the stream name if the physicalName doesn't 
exist.
     */
-  private def streamIdToSystemStream(streamId: String): SystemStream = {
+  def streamIdToSystemStream(streamId: String): SystemStream = {
     new SystemStream(getSystem(streamId), getPhysicalName(streamId))
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e1d7352..4f5df94 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -25,6 +25,8 @@ import java.util
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.net.{URL, UnknownHostException}
 
+import org.apache.samza.serializers.IntermediateMessageSerde
+import org.apache.samza.serializers.StringSerde
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -287,13 +289,38 @@ object SamzaContainer extends Logging {
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
+    val intermediateStreams = config
+      .getStreamIds
+      .filter(config.getIsIntermediate(_))
+      .toList
+
+    info("Got intermediate streams: %s" format intermediateStreams)
+
+    val controlMessageKeySerdes = intermediateStreams
+      .flatMap(streamId => {
+        val systemStream = config.streamIdToSystemStream(streamId)
+        systemStreamKeySerdes.get(systemStream)
+                .orElse(systemKeySerdes.get(systemStream.getSystem))
+                .map(serde => (systemStream, new StringSerde("UTF-8")))
+      }).toMap
+
+    val intermediateStreamMessageSerdes = intermediateStreams
+      .flatMap(streamId => {
+        val systemStream = config.streamIdToSystemStream(streamId)
+        systemStreamMessageSerdes.get(systemStream)
+                .orElse(systemMessageSerdes.get(systemStream.getSystem))
+                .map(serde => (systemStream, new 
IntermediateMessageSerde(serde)))
+      }).toMap
+
     val serdeManager = new SerdeManager(
       serdes = serdes,
       systemKeySerdes = systemKeySerdes,
       systemMessageSerdes = systemMessageSerdes,
       systemStreamKeySerdes = systemStreamKeySerdes,
       systemStreamMessageSerdes = systemStreamMessageSerdes,
-      changeLogSystemStreams = changeLogSystemStreams.values.toSet)
+      changeLogSystemStreams = changeLogSystemStreams.values.toSet,
+      controlMessageKeySerdes = controlMessageKeySerdes,
+      intermediateMessageSerdes = intermediateStreamMessageSerdes)
 
     info("Setting up JVM metrics.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 4540bce..0100c78 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -20,7 +20,8 @@
 package org.apache.samza.serializers
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.SerializerConfig
+import org.apache.samza.message.ControlMessage
+import org.apache.samza.message.WatermarkMessage
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.IncomingMessageEnvelope
@@ -32,7 +33,9 @@ class SerdeManager(
   systemMessageSerdes: Map[String, Serde[Object]] = Map(),
   systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(),
   systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(),
-  changeLogSystemStreams: Set[SystemStream] = Set()) {
+  changeLogSystemStreams: Set[SystemStream] = Set(),
+  controlMessageKeySerdes: Map[SystemStream, Serde[String]] = Map(),
+  intermediateMessageSerdes: Map[SystemStream, Serde[Object]] = Map()) {
 
   def toBytes(obj: Object, serializerName: String) = serdes
     .getOrElse(serializerName, throw new SamzaException("No serde defined for 
%s" format serializerName))
@@ -43,6 +46,10 @@ class SerdeManager(
       || 
envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX))
 {
       // If the stream is a change log stream, don't do any serde. It is up to 
storage engines to handle serde.
       envelope.getKey
+    } else if (envelope.getMessage.isInstanceOf[ControlMessage]
+      && controlMessageKeySerdes.contains(envelope.getSystemStream)) {
+      // If the message is a control message and the key needs to serialize
+      
controlMessageKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey.asInstanceOf[String])
     } else if (envelope.getKeySerializerName != null) {
       // If a serde is defined, use it.
       toBytes(envelope.getKey, envelope.getKeySerializerName)
@@ -61,6 +68,9 @@ class SerdeManager(
       || 
envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX))
 {
       // If the stream is a change log stream, don't do any serde. It is up to 
storage engines to handle serde.
       envelope.getMessage
+    } else if (intermediateMessageSerdes.contains(envelope.getSystemStream)) {
+      // If the stream is an intermediate stream, use the intermediate message 
serde
+      
intermediateMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage)
     } else if (envelope.getMessageSerializerName != null) {
       // If a serde is defined, use it.
       toBytes(envelope.getMessage, envelope.getMessageSerializerName)
@@ -93,34 +103,43 @@ class SerdeManager(
     .fromBytes(bytes)
 
   def fromBytes(envelope: IncomingMessageEnvelope) = {
-    val key = if 
(changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
-      || 
envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)
 ) {
+    val systemStream = envelope.getSystemStreamPartition.getSystemStream
+
+    val message = if (changeLogSystemStreams.contains(systemStream)
+      || 
systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
       // If the stream is a change log stream, don't do any serde. It is up to 
storage engines to handle serde.
-      envelope.getKey
-    } else if 
(systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) {
+      envelope.getMessage
+    } else if (intermediateMessageSerdes.contains(systemStream)) {
+      // If the stream is an intermediate stream, use the intermediate message 
serde
+      
intermediateMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+    } else if (systemStreamMessageSerdes.contains(systemStream)) {
       // If the stream has a serde defined, use it.
-      
systemStreamKeySerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
-    } else if 
(systemKeySerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+      
systemStreamMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+    } else if (systemMessageSerdes.contains(systemStream.getSystem)) {
       // If the system has a serde defined, use it.
-      
systemKeySerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+      
systemMessageSerdes(systemStream.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
     } else {
       // Just use the object.
-      envelope.getKey
+      envelope.getMessage
     }
 
-    val message = if 
(changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
-      || 
envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX))
 {
+    val key = if (changeLogSystemStreams.contains(systemStream)
+      || 
systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
       // If the stream is a change log stream, don't do any serde. It is up to 
storage engines to handle serde.
-      envelope.getMessage
-    } else if 
(systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) {
+      envelope.getKey
+    } else if (message.isInstanceOf[ControlMessage]
+      && controlMessageKeySerdes.contains(systemStream)) {
+      // If the message is a control message and the key needs to deserialize
+      
controlMessageKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+    } else if (systemStreamKeySerdes.contains(systemStream)) {
       // If the stream has a serde defined, use it.
-      
systemStreamMessageSerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
-    } else if 
(systemMessageSerdes.contains(envelope.getSystemStreamPartition.getSystem)) {
+      
systemStreamKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
+    } else if (systemKeySerdes.contains(systemStream.getSystem)) {
       // If the system has a serde defined, use it.
-      
systemMessageSerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]])
+      
systemKeySerdes(systemStream.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]])
     } else {
       // Just use the object.
-      envelope.getMessage
+      envelope.getKey
     }
 
     if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
new file mode 100644
index 0000000..5b76bba
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model.serializers;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.samza.message.EndOfStreamMessage;
+import org.apache.samza.message.WatermarkMessage;
+import org.apache.samza.message.MessageType;
+import org.apache.samza.serializers.IntermediateMessageSerde;
+import org.apache.samza.serializers.Serde;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestIntermediateMessageSerde {
+
+  static final class ObjectSerde implements Serde<Object> {
+
+    @Override
+    public Object fromBytes(byte[] bytes) {
+      try {
+        final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bytes));
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+      } catch (Exception e) {
+        return null;
+      }
+    }
+
+    @Override
+    public byte[] toBytes(Object object) {
+      try {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(object);
+        oos.close();
+        return baos.toByteArray();
+      } catch (Exception e) {
+        return null;
+      }
+    }
+  }
+
+  static final class TestUserMessage implements Serializable {
+    private final String message;
+    private final long timestamp;
+    private final int offset;
+
+    public TestUserMessage(String message, int offset, long timestamp) {
+      this.message = message;
+      this.offset = offset;
+      this.timestamp = timestamp;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public int getOffset() {
+      return offset;
+    }
+  }
+
+  @Test
+  public void testUserMessageSerde() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
+    String msg = "this is a test message";
+    TestUserMessage userMessage = new TestUserMessage(msg, 0, 
System.currentTimeMillis());
+    byte[] bytes = imserde.toBytes(userMessage);
+    TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes);
+    assertEquals(MessageType.of(de), MessageType.USER_MESSAGE);
+    assertEquals(de.getMessage(), msg);
+    assertEquals(de.getOffset(), 0);
+    assertTrue(de.getTimestamp() > 0);
+  }
+
+  @Test
+  public void testWatermarkMessageSerde() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
+    String taskName = "task-1";
+    WatermarkMessage watermark = new 
WatermarkMessage(System.currentTimeMillis(), taskName, 8);
+    byte[] bytes = imserde.toBytes(watermark);
+    WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes);
+    assertEquals(MessageType.of(de), MessageType.WATERMARK);
+    assertEquals(de.getTaskName(), taskName);
+    assertEquals(de.getTaskCount(), 8);
+    assertTrue(de.getTimestamp() > 0);
+  }
+
+  @Test
+  public void testEndOfStreamMessageSerde() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
+    String streamId = "test-stream";
+    String taskName = "task-1";
+    EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8);
+    byte[] bytes = imserde.toBytes(eos);
+    EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes);
+    assertEquals(MessageType.of(de), MessageType.END_OF_STREAM);
+    assertEquals(de.getTaskName(), taskName);
+    assertEquals(de.getTaskCount(), 8);
+    assertEquals(de.getVersion(), 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
index 646d0c8..9d808cb 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -19,7 +19,12 @@
 
 package org.apache.samza.serializers
 
+
+import org.apache.samza.message.EndOfStreamMessage
+import org.apache.samza.message.WatermarkMessage
 import org.junit.Assert._
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertEquals
 import org.junit.Test
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemStream
@@ -42,4 +47,61 @@ class TestSerdeManager {
     val deserialized = new SerdeManager().fromBytes(original)
     assertSame(original, deserialized)
   }
+
+  @Test
+  def testIntermediateMessageSerde {
+    val output = new SystemStream("my-system", "output")
+    val intermediate = new SystemStream("my-system", "intermediate")
+    val intSerde = (new IntegerSerde).asInstanceOf[Serde[Object]]
+    val systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(output 
-> intSerde)
+    val systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = 
Map(output -> intSerde)
+    val controlMessageKeySerdes: Map[SystemStream, Serde[String]] = 
Map(intermediate -> new StringSerde("UTF-8"))
+    val intermediateMessageSerdes: Map[SystemStream, Serde[Object]] = 
Map(intermediate -> new IntermediateMessageSerde(intSerde))
+
+    val serdeManager = new SerdeManager(systemStreamKeySerdes = 
systemStreamKeySerdes,
+                                        systemStreamMessageSerdes = 
systemStreamMessageSerdes,
+                                        controlMessageKeySerdes = 
controlMessageKeySerdes,
+                                        intermediateMessageSerdes = 
intermediateMessageSerdes)
+
+    // test user message sent to output stream
+    var outEnvelope = new OutgoingMessageEnvelope(output, 1, 1000)
+    var se = serdeManager.toBytes(outEnvelope)
+    var inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(output, new Partition(0)), "offset", se.getKey, 
se.getMessage)
+    var de = serdeManager.fromBytes(inEnvelope)
+    assertEquals(de.getKey, 1)
+    assertEquals(de.getMessage, 1000)
+
+    // test user message sent to intermediate stream
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, 1, 1000)
+    se = serdeManager.toBytes(outEnvelope)
+    inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, 
se.getMessage)
+    de = serdeManager.fromBytes(inEnvelope)
+    assertEquals(de.getKey, 1)
+    assertEquals(de.getMessage, 1000)
+
+    // test end-of-stream message sent to intermediate stream
+    val eosStreamId = "eos-stream"
+    val taskName = "task 1"
+    val taskCount = 8
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new 
EndOfStreamMessage(taskName, taskCount))
+    se = serdeManager.toBytes(outEnvelope)
+    inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, 
se.getMessage)
+    de = serdeManager.fromBytes(inEnvelope)
+    assertEquals(de.getKey, "eos")
+    val eosMsg = de.getMessage.asInstanceOf[EndOfStreamMessage]
+    assertEquals(eosMsg.getTaskName, taskName)
+    assertEquals(eosMsg.getTaskCount, taskCount)
+
+    // test watermark message sent to intermediate stream
+    val timestamp = System.currentTimeMillis()
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new 
WatermarkMessage(timestamp, taskName, taskCount))
+    se = serdeManager.toBytes(outEnvelope)
+    inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, 
se.getMessage)
+    de = serdeManager.fromBytes(inEnvelope)
+    assertEquals(de.getKey, "watermark")
+    val watermarkMsg = de.getMessage.asInstanceOf[WatermarkMessage]
+    assertEquals(watermarkMsg.getTimestamp, timestamp)
+    assertEquals(watermarkMsg.getTaskName, taskName)
+    assertEquals(watermarkMsg.getTaskCount, taskCount)
+  }
 }
\ No newline at end of file

Reply via email to