Repository: samza Updated Branches: refs/heads/master ae29a40d0 -> 3375c7116
SAMZA-1312: Add Control Messages and Intermediate Stream Serde In this patch, we add the control message types which includes: * EndOfStreamMessage * WatermarkMessage To support in-band data and control messages, we provide a wrapper serde (IntermediateMessageSerde) to serialize/deserialize data/control messages based on message type byte (first byte in the intermediate stream message). The format of the message is defined in SAMZA-1312. The patch integrates this serde with SerdeManager. Tested in example jobs deployed locally and works as expected. Author: Xinyu Liu <[email protected]> Reviewers: Jagadish V <[email protected]> Closes #207 from xinyuiscool/SAMZA-1312 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3375c711 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3375c711 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3375c711 Branch: refs/heads/master Commit: 3375c7116ef68d845b4f96cf5d1b9291397d79ec Parents: ae29a40 Author: Xinyu Liu <[email protected]> Authored: Tue Jun 13 09:46:02 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Tue Jun 13 09:46:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/execution/JobGraph.java | 20 ++- .../org/apache/samza/execution/JobNode.java | 21 ++- .../org/apache/samza/execution/StreamEdge.java | 10 ++ .../apache/samza/message/ControlMessage.java | 52 +++++++ .../samza/message/EndOfStreamMessage.java | 36 +++++ .../org/apache/samza/message/MessageType.java | 46 ++++++ .../apache/samza/message/WatermarkMessage.java | 43 ++++++ .../serializers/IntermediateMessageSerde.java | 141 +++++++++++++++++++ .../org/apache/samza/config/StreamConfig.scala | 19 ++- .../apache/samza/container/SamzaContainer.scala | 29 +++- .../apache/samza/serializers/SerdeManager.scala | 55 +++++--- .../TestIntermediateMessageSerde.java | 131 +++++++++++++++++ .../samza/serializers/TestSerdeManager.scala | 62 ++++++++ 13 files changed, 638 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index 35f27ab..99ee86c 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -130,7 +130,7 @@ import org.slf4j.LoggerFactory; * @param to the target node */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) { - StreamEdge edge = getOrCreateStreamEdge(streamSpec); + StreamEdge edge = getOrCreateStreamEdge(streamSpec, true); edge.addSourceNode(from); edge.addTargetNode(to); from.addOutEdge(edge); @@ -160,16 +160,32 @@ import org.slf4j.LoggerFactory; * @return stream edge */ StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) { + return getOrCreateStreamEdge(streamSpec, false); + } + + /** + * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist. + * @param streamSpec spec of the StreamEdge + * @param isIntermediate boolean flag indicating whether it's an intermediate stream + * @return stream edge + */ + StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) { String streamId = streamSpec.getId(); StreamEdge edge = edges.get(streamId); if (edge == null) { - edge = new StreamEdge(streamSpec); + edge = new StreamEdge(streamSpec, isIntermediate); edges.put(streamId, edge); } return edge; } /** + * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist. + * @param streamSpec spec of the StreamEdge + * @return stream edge + */ + + /** * Returns the job nodes to be executed in the topological order * @return unmodifiable list of {@link JobNode} */ http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index c42e1cc..88b24ba 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -29,12 +29,14 @@ import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.util.MathUtils; +import org.apache.samza.system.StreamSpec; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,10 +122,13 @@ public class JobNode { } } - log.info("Job {} has generated configs {}", jobName, configs); - configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); + // write input/output streams to configs + inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs)); + + log.info("Job {} has generated configs {}", jobName, configs); + String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline. return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix))); @@ -185,6 +190,18 @@ public class JobNode { return scopedConfig; } + private static void addStreamConfig(StreamEdge edge, Map<String, String> config) { + StreamSpec spec = edge.getStreamSpec(); + config.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName()); + config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName()); + if (edge.isIntermediate()) { + config.put(String.format(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID(), spec.getId()), "true"); + } + spec.getConfig().forEach((property, value) -> { + config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value); + }); + } + static String createId(String jobName, String jobId) { return String.format("%s-%s", jobName, jobId); } http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index 9596d0f..35fde81 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -40,10 +40,16 @@ public class StreamEdge { private String name = ""; private int partitions = PARTITIONS_UNKNOWN; + private final boolean isIntermediate; StreamEdge(StreamSpec streamSpec) { + this(streamSpec, false); + } + + StreamEdge(StreamSpec streamSpec, boolean isIntermediate) { this.streamSpec = streamSpec; this.name = Util.getNameFromSystemStream(getSystemStream()); + this.isIntermediate = isIntermediate; } void addSourceNode(JobNode sourceNode) { @@ -93,4 +99,8 @@ public class StreamEdge { void setName(String name) { this.name = name; } + + boolean isIntermediate() { + return isIntermediate; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java new file mode 100644 index 0000000..46bf559 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.message; + +/** + * The abstract class of all control messages, containing + * the task that produces the control message, the total number of producer tasks, + * and a version number. + */ +public abstract class ControlMessage { + private final String taskName; + private final int taskCount; + private int version = 1; + + public ControlMessage(String taskName, int taskCount) { + this.taskName = taskName; + this.taskCount = taskCount; + } + + public String getTaskName() { + return taskName; + } + + public int getTaskCount() { + return taskCount; + } + + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java new file mode 100644 index 0000000..91981a9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.message; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * The EndOfStreamMessage is a control message that is sent out to next stage + * once the task has consumed to the end of a bounded stream. + */ +public class EndOfStreamMessage extends ControlMessage { + + @JsonCreator + public EndOfStreamMessage(@JsonProperty("task-name") String taskName, + @JsonProperty("task-count") int taskCount) { + super(taskName, taskCount); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/MessageType.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/message/MessageType.java b/samza-core/src/main/java/org/apache/samza/message/MessageType.java new file mode 100644 index 0000000..b1199b6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/message/MessageType.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.message; + +/** + * The type of the intermediate stream message. The enum will be encoded using its ordinal value and + * put in the first byte of the serialization of intermediate message. + * For more details, see {@link org.apache.samza.serializers.IntermediateMessageSerde} + */ +public enum MessageType { + USER_MESSAGE, + WATERMARK, + END_OF_STREAM; + + /** + * Returns the {@link MessageType} of a particular intermediate stream message. + * @param message an intermediate stream message + * @return type of the message + */ + public static MessageType of(Object message) { + if (message instanceof WatermarkMessage) { + return WATERMARK; + } else if (message instanceof EndOfStreamMessage) { + return END_OF_STREAM; + } else { + return USER_MESSAGE; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java new file mode 100644 index 0000000..aa25742 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.message; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * The WatermarkMessage is a control message that is sent out to next stage + * with a watermark timestamp and the task that produces the watermark. + */ +public class WatermarkMessage extends ControlMessage { + private final long timestamp; + + @JsonCreator + public WatermarkMessage(@JsonProperty("timestamp") long timestamp, + @JsonProperty("task-name") String taskName, + @JsonProperty("task-count") int taskCount) { + super(taskName, taskCount); + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java new file mode 100644 index 0000000..26ef92c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.util.Arrays; +import org.apache.samza.SamzaException; +import org.apache.samza.message.EndOfStreamMessage; +import org.apache.samza.message.MessageType; +import org.apache.samza.message.WatermarkMessage; +import org.codehaus.jackson.type.TypeReference; + + +/** + * This class provides serialization/deserialization of the intermediate messages. + * + * The message format of an intermediate stream is below: + * + * IntermediateStreamMessage: { + * MessageType : int8 + * MessageData : byte[] + * } + * + * MessageType: [0(UserMessage), 1(Watermark), 2(EndOfStream)] + * MessageData: [UserMessage/ControlMessage] + * ControlMessage: + * Version : int + * TaskName : string + * TaskCount : int + * Other Message Data (based on different types of control message) + * + * For user message, we use the user message serde. + * For control message, we use json serde. + */ +public class IntermediateMessageSerde implements Serde<Object> { + + private static final class WatermarkSerde extends JsonSerde<WatermarkMessage> { + @Override + public WatermarkMessage fromBytes(byte[] bytes) { + try { + return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<WatermarkMessage>() { }); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } + + private static final class EndOfStreamSerde extends JsonSerde<EndOfStreamMessage> { + @Override + public EndOfStreamMessage fromBytes(byte[] bytes) { + try { + return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<EndOfStreamMessage>() { }); + } catch (Exception e) { + throw new SamzaException(e); + } + } + } + + private final Serde userMessageSerde; + private final Serde<WatermarkMessage> watermarkSerde; + private final Serde<EndOfStreamMessage> eosSerde; + + public IntermediateMessageSerde(Serde userMessageSerde) { + this.userMessageSerde = userMessageSerde; + this.watermarkSerde = new WatermarkSerde(); + this.eosSerde = new EndOfStreamSerde(); + } + + @Override + public Object fromBytes(byte[] bytes) { + try { + final Object object; + final MessageType type = MessageType.values()[bytes[0]]; + final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length); + switch (type) { + case USER_MESSAGE: + object = userMessageSerde.fromBytes(data); + break; + case WATERMARK: + object = watermarkSerde.fromBytes(data); + break; + case END_OF_STREAM: + object = eosSerde.fromBytes(data); + break; + default: + throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name())); + } + return object; + + } catch (UnsupportedOperationException ue) { + throw new SamzaException(ue); + } catch (Exception e) { + // This will catch the following exceptions: + // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception + // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception + // For these cases, we fall back to user-provided serde + return userMessageSerde.fromBytes(bytes); + } + } + + @Override + public byte[] toBytes(Object object) { + final byte [] data; + final MessageType type = MessageType.of(object); + switch (type) { + case USER_MESSAGE: + data = userMessageSerde.toBytes(object); + break; + case WATERMARK: + data = watermarkSerde.toBytes((WatermarkMessage) object); + break; + case END_OF_STREAM: + data = eosSerde.toBytes((EndOfStreamMessage) object); + break; + default: + throw new SamzaException("Unknown message type: " + type.name()); + } + + final byte [] bytes = new byte[data.length + 1]; + bytes[0] = (byte) type.ordinal(); + System.arraycopy(data, 0, bytes, 1, data.length); + + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 389a883..20192fb 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -36,14 +36,16 @@ object StreamConfig { val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default" val BOOTSTRAP = SAMZA_PROPERTY + "bootstrap" val PRIORITY = SAMZA_PROPERTY + "priority" + val IS_INTERMEDIATE = SAMZA_PROPERTY + "intermediate" // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values. private val STREAMS_PREFIX = "streams." private val STREAM_PREFIX = "systems.%s.streams.%s." - protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." - protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM + val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." + val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME + val IS_INTERMEDIATE_FROM_STREAM_ID = STREAM_ID_PREFIX + IS_INTERMEDIATE implicit def Config2Stream(config: Config) = new StreamConfig(config) } @@ -152,6 +154,15 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { } /** + * Gets the boolean flag of whether the specified streamId is an intermediate stream + * @param streamId the identifier for the stream in the config. + * @return true if the stream is intermediate + */ + def getIsIntermediate(streamId: String) = { + getBoolean(StreamConfig.IS_INTERMEDIATE_FROM_STREAM_ID format streamId, false) + } + + /** * Gets the stream IDs of all the streams defined in the config * @return collection of stream IDs */ @@ -259,7 +270,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { getStreamIds().filter(streamId => system.equals(getSystem(streamId))) } - private def systemStreamToStreamId(systemStream: SystemStream): String = { + def systemStreamToStreamId(systemStream: SystemStream): String = { val streamIds = getStreamIdsForSystem(systemStream.getSystem).filter(streamId => systemStream.getStream().equals(getPhysicalName(streamId))) if (streamIds.size > 1) { throw new IllegalStateException("There was more than one stream found for system stream %s" format(systemStream)) @@ -276,7 +287,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { * A streamId is translated to a SystemStream by looking up its System and physicalName. It * will use the streamId as the stream name if the physicalName doesn't exist. */ - private def streamIdToSystemStream(streamId: String): SystemStream = { + def streamIdToSystemStream(streamId: String): SystemStream = { new SystemStream(getSystem(streamId), getPhysicalName(streamId)) } http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index e1d7352..4f5df94 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -25,6 +25,8 @@ import java.util import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import java.net.{URL, UnknownHostException} +import org.apache.samza.serializers.IntermediateMessageSerde +import org.apache.samza.serializers.StringSerde import org.apache.samza.{SamzaContainerStatus, SamzaException} import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job @@ -287,13 +289,38 @@ object SamzaContainer extends Logging { info("Got change log system streams: %s" format changeLogSystemStreams) + val intermediateStreams = config + .getStreamIds + .filter(config.getIsIntermediate(_)) + .toList + + info("Got intermediate streams: %s" format intermediateStreams) + + val controlMessageKeySerdes = intermediateStreams + .flatMap(streamId => { + val systemStream = config.streamIdToSystemStream(streamId) + systemStreamKeySerdes.get(systemStream) + .orElse(systemKeySerdes.get(systemStream.getSystem)) + .map(serde => (systemStream, new StringSerde("UTF-8"))) + }).toMap + + val intermediateStreamMessageSerdes = intermediateStreams + .flatMap(streamId => { + val systemStream = config.streamIdToSystemStream(streamId) + systemStreamMessageSerdes.get(systemStream) + .orElse(systemMessageSerdes.get(systemStream.getSystem)) + .map(serde => (systemStream, new IntermediateMessageSerde(serde))) + }).toMap + val serdeManager = new SerdeManager( serdes = serdes, systemKeySerdes = systemKeySerdes, systemMessageSerdes = systemMessageSerdes, systemStreamKeySerdes = systemStreamKeySerdes, systemStreamMessageSerdes = systemStreamMessageSerdes, - changeLogSystemStreams = changeLogSystemStreams.values.toSet) + changeLogSystemStreams = changeLogSystemStreams.values.toSet, + controlMessageKeySerdes = controlMessageKeySerdes, + intermediateMessageSerdes = intermediateStreamMessageSerdes) info("Setting up JVM metrics.") http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala index 4540bce..0100c78 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala @@ -20,7 +20,8 @@ package org.apache.samza.serializers import org.apache.samza.SamzaException -import org.apache.samza.config.SerializerConfig +import org.apache.samza.message.ControlMessage +import org.apache.samza.message.WatermarkMessage import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.IncomingMessageEnvelope @@ -32,7 +33,9 @@ class SerdeManager( systemMessageSerdes: Map[String, Serde[Object]] = Map(), systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(), systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(), - changeLogSystemStreams: Set[SystemStream] = Set()) { + changeLogSystemStreams: Set[SystemStream] = Set(), + controlMessageKeySerdes: Map[SystemStream, Serde[String]] = Map(), + intermediateMessageSerdes: Map[SystemStream, Serde[Object]] = Map()) { def toBytes(obj: Object, serializerName: String) = serdes .getOrElse(serializerName, throw new SamzaException("No serde defined for %s" format serializerName)) @@ -43,6 +46,10 @@ class SerdeManager( || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getKey + } else if (envelope.getMessage.isInstanceOf[ControlMessage] + && controlMessageKeySerdes.contains(envelope.getSystemStream)) { + // If the message is a control message and the key needs to serialize + controlMessageKeySerdes(envelope.getSystemStream).toBytes(envelope.getKey.asInstanceOf[String]) } else if (envelope.getKeySerializerName != null) { // If a serde is defined, use it. toBytes(envelope.getKey, envelope.getKeySerializerName) @@ -61,6 +68,9 @@ class SerdeManager( || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getMessage + } else if (intermediateMessageSerdes.contains(envelope.getSystemStream)) { + // If the stream is an intermediate stream, use the intermediate message serde + intermediateMessageSerdes(envelope.getSystemStream).toBytes(envelope.getMessage) } else if (envelope.getMessageSerializerName != null) { // If a serde is defined, use it. toBytes(envelope.getMessage, envelope.getMessageSerializerName) @@ -93,34 +103,43 @@ class SerdeManager( .fromBytes(bytes) def fromBytes(envelope: IncomingMessageEnvelope) = { - val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream) - || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) { + val systemStream = envelope.getSystemStreamPartition.getSystemStream + + val message = if (changeLogSystemStreams.contains(systemStream) + || systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. - envelope.getKey - } else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) { + envelope.getMessage + } else if (intermediateMessageSerdes.contains(systemStream)) { + // If the stream is an intermediate stream, use the intermediate message serde + intermediateMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]]) + } else if (systemStreamMessageSerdes.contains(systemStream)) { // If the stream has a serde defined, use it. - systemStreamKeySerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]]) - } else if (systemKeySerdes.contains(envelope.getSystemStreamPartition.getSystem)) { + systemStreamMessageSerdes(systemStream).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]]) + } else if (systemMessageSerdes.contains(systemStream.getSystem)) { // If the system has a serde defined, use it. - systemKeySerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]]) + systemMessageSerdes(systemStream.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]]) } else { // Just use the object. - envelope.getKey + envelope.getMessage } - val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream) - || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { + val key = if (changeLogSystemStreams.contains(systemStream) + || systemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. - envelope.getMessage - } else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) { + envelope.getKey + } else if (message.isInstanceOf[ControlMessage] + && controlMessageKeySerdes.contains(systemStream)) { + // If the message is a control message and the key needs to deserialize + controlMessageKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]]) + } else if (systemStreamKeySerdes.contains(systemStream)) { // If the stream has a serde defined, use it. - systemStreamMessageSerdes(envelope.getSystemStreamPartition).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]]) - } else if (systemMessageSerdes.contains(envelope.getSystemStreamPartition.getSystem)) { + systemStreamKeySerdes(systemStream).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]]) + } else if (systemKeySerdes.contains(systemStream.getSystem)) { // If the system has a serde defined, use it. - systemMessageSerdes(envelope.getSystemStreamPartition.getSystem).fromBytes(envelope.getMessage.asInstanceOf[Array[Byte]]) + systemKeySerdes(systemStream.getSystem).fromBytes(envelope.getKey.asInstanceOf[Array[Byte]]) } else { // Just use the object. - envelope.getMessage + envelope.getKey } if ((key eq envelope.getKey) && (message eq envelope.getMessage)) { http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java new file mode 100644 index 0000000..5b76bba --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model.serializers; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.samza.message.EndOfStreamMessage; +import org.apache.samza.message.WatermarkMessage; +import org.apache.samza.message.MessageType; +import org.apache.samza.serializers.IntermediateMessageSerde; +import org.apache.samza.serializers.Serde; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestIntermediateMessageSerde { + + static final class ObjectSerde implements Serde<Object> { + + @Override + public Object fromBytes(byte[] bytes) { + try { + final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + Object object = ois.readObject(); + ois.close(); + return object; + } catch (Exception e) { + return null; + } + } + + @Override + public byte[] toBytes(Object object) { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + return baos.toByteArray(); + } catch (Exception e) { + return null; + } + } + } + + static final class TestUserMessage implements Serializable { + private final String message; + private final long timestamp; + private final int offset; + + public TestUserMessage(String message, int offset, long timestamp) { + this.message = message; + this.offset = offset; + this.timestamp = timestamp; + } + + public String getMessage() { + return message; + } + + public long getTimestamp() { + return timestamp; + } + + public int getOffset() { + return offset; + } + } + + @Test + public void testUserMessageSerde() { + IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); + String msg = "this is a test message"; + TestUserMessage userMessage = new TestUserMessage(msg, 0, System.currentTimeMillis()); + byte[] bytes = imserde.toBytes(userMessage); + TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes); + assertEquals(MessageType.of(de), MessageType.USER_MESSAGE); + assertEquals(de.getMessage(), msg); + assertEquals(de.getOffset(), 0); + assertTrue(de.getTimestamp() > 0); + } + + @Test + public void testWatermarkMessageSerde() { + IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); + String taskName = "task-1"; + WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName, 8); + byte[] bytes = imserde.toBytes(watermark); + WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes); + assertEquals(MessageType.of(de), MessageType.WATERMARK); + assertEquals(de.getTaskName(), taskName); + assertEquals(de.getTaskCount(), 8); + assertTrue(de.getTimestamp() > 0); + } + + @Test + public void testEndOfStreamMessageSerde() { + IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); + String streamId = "test-stream"; + String taskName = "task-1"; + EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8); + byte[] bytes = imserde.toBytes(eos); + EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes); + assertEquals(MessageType.of(de), MessageType.END_OF_STREAM); + assertEquals(de.getTaskName(), taskName); + assertEquals(de.getTaskCount(), 8); + assertEquals(de.getVersion(), 1); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/3375c711/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala index 646d0c8..9d808cb 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala @@ -19,7 +19,12 @@ package org.apache.samza.serializers + +import org.apache.samza.message.EndOfStreamMessage +import org.apache.samza.message.WatermarkMessage import org.junit.Assert._ +import org.junit.Assert.assertEquals +import org.junit.Assert.assertEquals import org.junit.Test import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.SystemStream @@ -42,4 +47,61 @@ class TestSerdeManager { val deserialized = new SerdeManager().fromBytes(original) assertSame(original, deserialized) } + + @Test + def testIntermediateMessageSerde { + val output = new SystemStream("my-system", "output") + val intermediate = new SystemStream("my-system", "intermediate") + val intSerde = (new IntegerSerde).asInstanceOf[Serde[Object]] + val systemStreamKeySerdes: Map[SystemStream, Serde[Object]] = Map(output -> intSerde) + val systemStreamMessageSerdes: Map[SystemStream, Serde[Object]] = Map(output -> intSerde) + val controlMessageKeySerdes: Map[SystemStream, Serde[String]] = Map(intermediate -> new StringSerde("UTF-8")) + val intermediateMessageSerdes: Map[SystemStream, Serde[Object]] = Map(intermediate -> new IntermediateMessageSerde(intSerde)) + + val serdeManager = new SerdeManager(systemStreamKeySerdes = systemStreamKeySerdes, + systemStreamMessageSerdes = systemStreamMessageSerdes, + controlMessageKeySerdes = controlMessageKeySerdes, + intermediateMessageSerdes = intermediateMessageSerdes) + + // test user message sent to output stream + var outEnvelope = new OutgoingMessageEnvelope(output, 1, 1000) + var se = serdeManager.toBytes(outEnvelope) + var inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(output, new Partition(0)), "offset", se.getKey, se.getMessage) + var de = serdeManager.fromBytes(inEnvelope) + assertEquals(de.getKey, 1) + assertEquals(de.getMessage, 1000) + + // test user message sent to intermediate stream + outEnvelope = new OutgoingMessageEnvelope(intermediate, 1, 1000) + se = serdeManager.toBytes(outEnvelope) + inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage) + de = serdeManager.fromBytes(inEnvelope) + assertEquals(de.getKey, 1) + assertEquals(de.getMessage, 1000) + + // test end-of-stream message sent to intermediate stream + val eosStreamId = "eos-stream" + val taskName = "task 1" + val taskCount = 8 + outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new EndOfStreamMessage(taskName, taskCount)) + se = serdeManager.toBytes(outEnvelope) + inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage) + de = serdeManager.fromBytes(inEnvelope) + assertEquals(de.getKey, "eos") + val eosMsg = de.getMessage.asInstanceOf[EndOfStreamMessage] + assertEquals(eosMsg.getTaskName, taskName) + assertEquals(eosMsg.getTaskCount, taskCount) + + // test watermark message sent to intermediate stream + val timestamp = System.currentTimeMillis() + outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new WatermarkMessage(timestamp, taskName, taskCount)) + se = serdeManager.toBytes(outEnvelope) + inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage) + de = serdeManager.fromBytes(inEnvelope) + assertEquals(de.getKey, "watermark") + val watermarkMsg = de.getMessage.asInstanceOf[WatermarkMessage] + assertEquals(watermarkMsg.getTimestamp, timestamp) + assertEquals(watermarkMsg.getTaskName, taskName) + assertEquals(watermarkMsg.getTaskCount, taskCount) + } } \ No newline at end of file
