http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 99496eb..9b747bc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -18,9 +18,14 @@ */ package org.apache.samza.operators.impl; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; @@ -36,6 +41,8 @@ import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -50,6 +57,7 @@ import java.util.Map; * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s. */ public class OperatorImplGraph { + private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class); /** * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating @@ -84,10 +92,27 @@ public class OperatorImplGraph { */ public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) { this.clock = clock; + + TaskContextImpl taskContext = (TaskContextImpl) context; + Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(streamGraph) ? + getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()), + getIntermediateToInputStreamsMap(streamGraph)) : + Collections.EMPTY_MAP; + producerTaskCounts.forEach((stream, count) -> { + LOG.info("{} has {} producer tasks.", stream, count); + }); + + // set states for end-of-stream + taskContext.registerObject(EndOfStreamStates.class.getName(), + new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts)); + // set states for watermark + taskContext.registerObject(WatermarkStates.class.getName(), + new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts)); + streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> { SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()); InputOperatorImpl inputOperatorImpl = - (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context); + (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context); this.inputOperators.put(systemStream, inputOperatorImpl); }); } @@ -128,17 +153,18 @@ public class OperatorImplGraph { * @return the operator implementation for the operatorSpec */ OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, - Config config, TaskContext context) { + SystemStream inputStream, Config config, TaskContext context) { if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) { // Either this is the first time we've seen this operatorSpec, or this is a join operator spec // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG. OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context); operatorImpl.init(config, context); + operatorImpl.registerInputStream(inputStream); operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl); Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { - OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context); + OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); operatorImpl.registerNextOperator(nextImpl); }); return operatorImpl; @@ -246,4 +272,70 @@ public class OperatorImplGraph { } }; } + + private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) { + return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet()); + } + + /** + * calculate the task count that produces to each intermediate streams + * @param streamToConsumerTasks input streams to task mapping + * @param intermediateToInputStreams intermediate stream to input streams mapping + * @return mapping from intermediate stream to task count + */ + static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams( + Multimap<SystemStream, String> streamToConsumerTasks, + Multimap<SystemStream, SystemStream> intermediateToInputStreams) { + Map<SystemStream, Integer> result = new HashMap<>(); + intermediateToInputStreams.asMap().entrySet().forEach(entry -> { + result.put(entry.getKey(), + entry.getValue().stream() + .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream()) + .collect(Collectors.toSet()).size()); + }); + return result; + } + + /** + * calculate the mapping from input streams to consumer tasks + * @param jobModel JobModel object + * @return mapping from input stream to tasks + */ + static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) { + Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create(); + jobModel.getContainers().values().forEach(containerModel -> { + containerModel.getTasks().values().forEach(taskModel -> { + taskModel.getSystemStreamPartitions().forEach(ssp -> { + streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName()); + }); + }); + }); + return streamToConsumerTasks; + } + + /** + * calculate the mapping from output streams to input streams + * @param streamGraph the user {@link StreamGraphImpl} instance + * @return mapping from output streams to input streams + */ + static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) { + Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create(); + streamGraph.getInputOperators().entrySet().stream() + .forEach( + entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams)); + return outputToInputStreams; + } + + private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec, + Multimap<SystemStream, SystemStream> outputToInputStreams) { + if (opSpec instanceof OutputOperatorSpec) { + OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec; + if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) { + outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input); + } + } else { + Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs(); + nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams)); + } + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java index f212b3e..205bba6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java @@ -21,12 +21,16 @@ package org.apache.samza.operators.impl; import java.util.Collection; import java.util.Collections; import org.apache.samza.config.Config; -import org.apache.samza.control.Watermark; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.system.ControlMessage; +import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.WatermarkMessage; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -39,10 +43,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> { private final OutputOperatorSpec<M> outputOpSpec; private final OutputStreamImpl<?, ?, M> outputStream; + private final String taskName; + private final ControlMessageSender controlMessageSender; OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) { this.outputOpSpec = outputOpSpec; this.outputStream = outputOpSpec.getOutputStream(); + this.taskName = context.getTaskName().getTaskName(); + + StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache(); + this.controlMessageSender = new ControlMessageSender(streamMetadataCache); } @Override @@ -71,12 +81,22 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> { } @Override - protected long handleWatermark(Watermark inputWatermark, - MessageCollector collector, - TaskCoordinator coordinator) { + protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) { - inputWatermark.propagate(outputStream.getStreamSpec().toSystemStream()); + sendControlMessage(new EndOfStreamMessage(taskName), collector); } - return inputWatermark.getTimestamp(); + } + + @Override + protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) { + if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) { + sendControlMessage(new WatermarkMessage(watermark, taskName), collector); + } + return watermark; + } + + private void sendControlMessage(ControlMessage message, MessageCollector collector) { + SystemStream outputStream = outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(); + controlMessageSender.send(message, outputStream, collector); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java new file mode 100644 index 0000000..0295626 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.WatermarkMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track + * of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time. + * + * This class is thread-safe. However, having parallelism within a task may result in out-of-order processing + * and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed. + */ +class WatermarkStates { + private static final Logger LOG = LoggerFactory.getLogger(WatermarkStates.class); + + public static final long WATERMARK_NOT_EXIST = -1; + + private final static class WatermarkState { + private final int expectedTotal; + private final Map<String, Long> timestamps = new HashMap<>(); + private volatile long watermarkTime = WATERMARK_NOT_EXIST; + + WatermarkState(int expectedTotal) { + this.expectedTotal = expectedTotal; + } + + synchronized void update(long timestamp, String taskName) { + if (taskName != null) { + Long ts = timestamps.get(taskName); + if (ts != null && ts > timestamp) { + LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s", + timestamp, ts, taskName)); + } else { + timestamps.put(taskName, timestamp); + } + } + + /** + * Check whether we got all the watermarks. + * At a sources, the expectedTotal is 0. + * For any intermediate streams, the expectedTotal is the upstream task count. + */ + if (timestamps.size() == expectedTotal) { + Optional<Long> min = timestamps.values().stream().min(Long::compare); + watermarkTime = min.orElse(timestamp); + } + } + + long getWatermarkTime() { + return watermarkTime; + } + } + + private final Map<SystemStreamPartition, WatermarkState> watermarkStates; + + WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) { + Map<SystemStreamPartition, WatermarkState> states = new HashMap<>(); + ssps.forEach(ssp -> { + states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0))); + }); + this.watermarkStates = Collections.unmodifiableMap(states); + } + + /** + * Update the state upon receiving a watermark message. + * @param watermarkMessage message of {@link WatermarkMessage} + * @param ssp system stream partition + * @return true iff the stream has a new watermark + */ + void update(WatermarkMessage watermarkMessage, SystemStreamPartition ssp) { + WatermarkState state = watermarkStates.get(ssp); + if (state != null) { + state.update(watermarkMessage.getTimestamp(), watermarkMessage.getTaskName()); + } else { + LOG.error("SSP {} doesn't have watermark states", ssp); + } + } + + long getWatermark(SystemStream systemStream) { + return watermarkStates.entrySet().stream() + .filter(entry -> entry.getKey().getSystemStream().equals(systemStream)) + .map(entry -> entry.getValue().getWatermarkTime()) + .min(Long::compare) + .orElse(WATERMARK_NOT_EXIST); + } + + /* package private for testing */ + long getWatermarkPerSSP(SystemStreamPartition ssp) { + return watermarkStates.get(ssp).getWatermarkTime(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 6fbc3c1..773f742 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.StreamSpec; import java.util.function.BiFunction; @@ -49,4 +50,9 @@ public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> { public BiFunction<K, V, M> getMsgBuilder() { return this.msgBuilder; } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 16f59d7..f4fe0fd 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -70,4 +71,9 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { / public long getTtlMs() { return ttlMs; } + + @Override + public WatermarkFunction getWatermarkFn() { + return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index f64e123..4047d92 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.WatermarkFunction; import java.util.Collection; import java.util.LinkedHashSet; @@ -118,4 +119,6 @@ public abstract class OperatorSpec<M, OM> { public final String getOpName() { return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId()); } + + abstract public WatermarkFunction getWatermarkFn(); } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index e6767ec..9759392 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@ -19,6 +19,8 @@ package org.apache.samza.operators.spec; +import org.apache.samza.operators.functions.WatermarkFunction; + /** * The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a * {@link org.apache.samza.system.SystemStream}. @@ -52,4 +54,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> { public OutputStreamImpl<?, ?, M> getOutputStream() { return this.outputStream; } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 2b55d95..1145be8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -48,4 +49,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> { public SinkFunction<M> getSinkFn() { return this.sinkFn; } + + @Override + public WatermarkFunction getWatermarkFn() { + return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index 1f2f683..aace2e2 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -46,4 +47,9 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { public FlatMapFunction<M, OM> getTransformFn() { return this.transformFn; } + + @Override + public WatermarkFunction getWatermarkFn() { + return transformFn instanceof WatermarkFunction ? (WatermarkFunction) transformFn : null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 0937499..75f1427 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,6 +19,8 @@ package org.apache.samza.operators.spec; +import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.triggers.AnyTrigger; import org.apache.samza.operators.triggers.RepeatingTrigger; import org.apache.samza.operators.triggers.TimeBasedTrigger; @@ -109,4 +111,10 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK } return timeBasedTriggers; } + + @Override + public WatermarkFunction getWatermarkFn() { + FoldLeftFunction fn = window.getFoldLeftFunction(); + return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java index 0b98ec6..2ed559f 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java @@ -21,9 +21,9 @@ package org.apache.samza.serializers; import java.util.Arrays; import org.apache.samza.SamzaException; -import org.apache.samza.message.EndOfStreamMessage; -import org.apache.samza.message.IntermediateMessageType; -import org.apache.samza.message.WatermarkMessage; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.MessageType; +import org.apache.samza.system.WatermarkMessage; import org.codehaus.jackson.type.TypeReference; @@ -86,16 +86,16 @@ public class IntermediateMessageSerde implements Serde<Object> { public Object fromBytes(byte[] bytes) { try { final Object object; - final IntermediateMessageType type = IntermediateMessageType.values()[bytes[0]]; + final MessageType type = MessageType.values()[bytes[0]]; final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length); switch (type) { case USER_MESSAGE: object = userMessageSerde.fromBytes(data); break; - case WATERMARK_MESSAGE: + case WATERMARK: object = watermarkSerde.fromBytes(data); break; - case END_OF_STREAM_MESSAGE: + case END_OF_STREAM: object = eosSerde.fromBytes(data); break; default: @@ -117,15 +117,15 @@ public class IntermediateMessageSerde implements Serde<Object> { @Override public byte[] toBytes(Object object) { final byte [] data; - final IntermediateMessageType type = IntermediateMessageType.of(object); + final MessageType type = MessageType.of(object); switch (type) { case USER_MESSAGE: data = userMessageSerde.toBytes(object); break; - case WATERMARK_MESSAGE: + case WATERMARK: data = watermarkSerde.toBytes((WatermarkMessage) object); break; - case END_OF_STREAM_MESSAGE: + case END_OF_STREAM: data = eosSerde.toBytes((EndOfStreamMessage) object); break; default: http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java index e57a89f..e2fea95 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java @@ -21,11 +21,7 @@ package org.apache.samza.task; import java.util.concurrent.ExecutorService; import org.apache.samza.config.Config; -import org.apache.samza.control.ControlMessageListenerTask; -import org.apache.samza.control.Watermark; -import org.apache.samza.control.IOGraph; import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStream; /** @@ -34,7 +30,7 @@ import org.apache.samza.system.SystemStream; * the callbacks once it's done. If the thread pool is null, it follows the legacy * synchronous model to execute the tasks on the run loop thread. */ -public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask, ControlMessageListenerTask { +public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask { private final StreamTask wrappedTask; private final ExecutorService executor; @@ -100,20 +96,4 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi ((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, coordinator); } } - - @Override - public IOGraph getIOGraph() { - if (wrappedTask instanceof ControlMessageListenerTask) { - return ((ControlMessageListenerTask) wrappedTask).getIOGraph(); - } - return null; - } - - @Override - public void onWatermark(Watermark watermark, SystemStream stream, MessageCollector collector, TaskCoordinator coordinator) { - if (wrappedTask instanceof ControlMessageListenerTask) { - ((ControlMessageListenerTask) wrappedTask).onWatermark(watermark, stream, collector, coordinator); - } - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 16b7e40..0074e24 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -21,25 +21,28 @@ package org.apache.samza.task; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; -import org.apache.samza.control.ControlMessageListenerTask; -import org.apache.samza.control.Watermark; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.MessageType; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; -import org.apache.samza.control.IOGraph; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.WatermarkMessage; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link StreamTask} implementation that brings all the operator API implementation components together and * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}. */ -public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask, ControlMessageListenerTask { +public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask { + private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class); private final StreamApplication streamApplication; private final ApplicationRunner runner; @@ -47,7 +50,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo private OperatorImplGraph operatorImplGraph; private ContextManager contextManager; - private IOGraph ioGraph; /** * Constructs an adaptor task to run the user-implemented {@link StreamApplication}. @@ -91,7 +93,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo // create the operator impl DAG corresponding to the logical operator spec DAG this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock); - this.ioGraph = streamGraph.toIOGraph(); } /** @@ -110,7 +111,21 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream(); InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream); if (inputOpImpl != null) { - inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator); + switch (MessageType.of(ime.getMessage())) { + case USER_MESSAGE: + inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator); + break; + + case END_OF_STREAM: + EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage(); + inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator); + break; + + case WATERMARK: + WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage(); + inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator); + break; + } } } @@ -121,22 +136,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo } @Override - public IOGraph getIOGraph() { - return ioGraph; - } - - @Override - public final void onWatermark(Watermark watermark, - SystemStream systemStream, - MessageCollector collector, - TaskCoordinator coordinator) { - InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream); - if (inputOpImpl != null) { - inputOpImpl.onWatermark(watermark, collector, coordinator); - } - } - - @Override public void close() throws Exception { if (this.contextManager != null) { this.contextManager.close(); http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 8c739d4..1b2ce80 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -24,6 +24,7 @@ package org.apache.samza.checkpoint import java.util.HashMap import java.util.concurrent.ConcurrentHashMap +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream @@ -193,7 +194,7 @@ class OffsetManager( */ def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) { lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]()) - if (offset != null) { + if (offset != null && !offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset) } } @@ -216,6 +217,10 @@ class OffsetManager( } } + def setStartingOffset(taskName: TaskName, ssp: SystemStreamPartition, offset: String): Unit = { + startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset)) + } + /** * Gets a snapshot of all the current offsets for the specified task. This is useful to * ensure there are no concurrent updates to the offsets between when this method is http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 5a19d90..acec365 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -20,25 +20,17 @@ package org.apache.samza.container -import com.google.common.collect.HashMultimap -import com.google.common.collect.Multimap import org.apache.samza.SamzaException import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.control.ControlMessageListenerTask -import org.apache.samza.control.ControlMessageUtils -import org.apache.samza.control.EndOfStreamManager -import org.apache.samza.control.WatermarkManager import org.apache.samza.job.model.JobModel -import org.apache.samza.message.MessageType import org.apache.samza.metrics.MetricsReporter import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.StreamMetadataCache import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.ClosableTask @@ -47,34 +39,11 @@ import org.apache.samza.task.InitableTask import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskCallbackFactory -import org.apache.samza.task.TaskContext import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.task.WindowableTask import org.apache.samza.util.Logging import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ - -object TaskInstance { - /** - * Build a map from a stream to its consumer tasks - * @param jobModel job model which contains ssp-to-task assignment - * @return the map of input stream to tasks - */ - def buildInputToTasks(jobModel: JobModel): Multimap[SystemStream, String] = { - val streamToTasks: Multimap[SystemStream, String] = HashMultimap.create[SystemStream, String] - if (jobModel != null) { - for (containerModel <- jobModel.getContainers.values) { - for (taskModel <- containerModel.getTasks.values) { - for (ssp <- taskModel.getSystemStreamPartitions) { - streamToTasks.put(ssp.getSystemStream, taskModel.getTaskName.toString) - } - } - } - } - return streamToTasks - } -} class TaskInstance( val task: Any, @@ -97,35 +66,10 @@ class TaskInstance( val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] val isClosableTask = task.isInstanceOf[ClosableTask] val isAsyncTask = task.isInstanceOf[AsyncStreamTask] - val isControlMessageListener = task.isInstanceOf[ControlMessageListenerTask] - - val context = new TaskContext { - var userContext: Object = null; - def getMetricsRegistry = metrics.registry - def getSystemStreamPartitions = systemStreamPartitions.asJava - def getStore(storeName: String) = if (storageManager != null) { - storageManager(storeName) - } else { - warn("No store found for name: %s" format storeName) - null - } - def getTaskName = taskName - def getSamzaContainerContext = containerContext + val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager, + storageManager, jobModel, streamMetadataCache) - override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = { - val startingOffsets = offsetManager.startingOffsets - offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset)) - } - - override def setUserContext(context: Object): Unit = { - userContext = context - } - - override def getUserContext: Object = { - userContext - } - } // store the (ssp -> if this ssp is catched up) mapping. "catched up" // means the same ssp in other taskInstances have the same offset as // the one here. @@ -133,10 +77,6 @@ class TaskInstance( scala.collection.mutable.Map[SystemStreamPartition, Boolean]() systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false) - val inputToTasksMapping = TaskInstance.buildInputToTasks(jobModel) - var endOfStreamManager: EndOfStreamManager = null - var watermarkManager: WatermarkManager = null - val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_)) def registerMetrics { @@ -169,22 +109,6 @@ class TaskInstance( } else { debug("Skipping task initialization for taskName: %s" format taskName) } - - if (isControlMessageListener) { - endOfStreamManager = new EndOfStreamManager(taskName.getTaskName, - task.asInstanceOf[ControlMessageListenerTask], - inputToTasksMapping, - systemStreamPartitions.asJava, - streamMetadataCache, - collector) - - watermarkManager = new WatermarkManager(taskName.getTaskName, - task.asInstanceOf[ControlMessageListenerTask], - inputToTasksMapping, - systemStreamPartitions.asJava, - streamMetadataCache, - collector) - } } def registerProducers { @@ -223,51 +147,20 @@ class TaskInstance( trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) - MessageType.of(envelope.getMessage) match { - case MessageType.USER_MESSAGE => - if (isAsyncTask) { - exceptionHandler.maybeHandle { - val callback = callbackFactory.createCallback() - task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback) - } - } - else { - exceptionHandler.maybeHandle { - task.asInstanceOf[StreamTask].process(envelope, collector, coordinator) - } - - trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" - format(taskName, envelope.getSystemStreamPartition, envelope.getOffset)) - - offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset) - } + if (isAsyncTask) { + exceptionHandler.maybeHandle { + val callback = callbackFactory.createCallback() + task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback) + } + } else { + exceptionHandler.maybeHandle { + task.asInstanceOf[StreamTask].process(envelope, collector, coordinator) + } - case MessageType.END_OF_STREAM => - if (isControlMessageListener) { - // handle eos synchronously. - runSync(callbackFactory) { - endOfStreamManager.update(envelope, coordinator) - } - } else { - warn("Ignore end-of-stream message due to %s not implementing ControlMessageListener." - format(task.getClass.toString)) - } + trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" + format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) - case MessageType.WATERMARK => - if (isControlMessageListener) { - // handle watermark synchronously in the run loop thread. - // we might consider running it asynchronously later - runSync(callbackFactory) { - val watermark = watermarkManager.update(envelope) - if (watermark != null) { - val stream = envelope.getSystemStreamPartition.getSystemStream - task.asInstanceOf[ControlMessageListenerTask].onWatermark(watermark, stream, collector, coordinator) - } - } - } else { - warn("Ignore watermark message due to %s not implementing ControlMessageListener." - format(task.getClass.toString)) - } + offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset) } } } @@ -343,38 +236,32 @@ class TaskInstance( * it's already catched-up. */ private def checkCaughtUp(envelope: IncomingMessageEnvelope) = { - systemAdmins match { - case null => { - warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true - } - case others => { - val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition) - .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition)) - val system = envelope.getSystemStreamPartition.getSystem - others(system).offsetComparator(envelope.getOffset, startingOffset) match { - case null => { - info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable - } - case result => { - if (result >= 0) { - info(envelope.getSystemStreamPartition.toString + " is catched up.") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) { + ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + } else { + systemAdmins match { + case null => { + warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up") + ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + } + case others => { + val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition) + .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition)) + val system = envelope.getSystemStreamPartition.getSystem + others(system).offsetComparator(envelope.getOffset, startingOffset) match { + case null => { + info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up") + ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable + } + case result => { + if (result >= 0) { + info(envelope.getSystemStreamPartition.toString + " is catched up.") + ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + } } } } } } } - - private def runSync(callbackFactory: TaskCallbackFactory)(runCodeBlock: => Unit) = { - val callback = callbackFactory.createCallback() - try { - runCodeBlock - callback.complete() - } catch { - case t: Throwable => callback.failure(t) - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala index 0100c78..76594ae 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala @@ -20,12 +20,13 @@ package org.apache.samza.serializers import org.apache.samza.SamzaException -import org.apache.samza.message.ControlMessage -import org.apache.samza.message.WatermarkMessage +import org.apache.samza.system.ControlMessage import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.config.StorageConfig +import org.apache.samza.system.WatermarkMessage + class SerdeManager( serdes: Map[String, Serde[Object]] = Map(), http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java b/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java deleted file mode 100644 index 8351802..0000000 --- a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.control; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.samza.Partition; -import org.apache.samza.container.TaskName; -import org.apache.samza.message.ControlMessage; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.task.MessageCollector; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestControlMessageUtils { - - @Test - public void testSendControlMessage() { - SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); - Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); - partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); - StreamMetadataCache metadataCache = mock(StreamMetadataCache.class); - when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); - - SystemStream systemStream = new SystemStream("test-system", "test-stream"); - Set<Integer> partitions = new HashSet<>(); - MessageCollector collector = mock(MessageCollector.class); - doAnswer(invocation -> { - OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; - partitions.add((Integer) envelope.getPartitionKey()); - assertEquals(envelope.getSystemStream(), systemStream); - return null; - }).when(collector).send(any()); - - ControlMessageUtils.sendControlMessage(mock(ControlMessage.class), systemStream, metadataCache, collector); - assertEquals(partitions.size(), 4); - } - - @Test - public void testCalculateUpstreamTaskCounts() { - SystemStream input1 = new SystemStream("test-system", "input-stream-1"); - SystemStream input2 = new SystemStream("test-system", "input-stream-2"); - SystemStream input3 = new SystemStream("test-system", "input-stream-3"); - - Multimap<SystemStream, String> inputToTasks = HashMultimap.create(); - TaskName t0 = new TaskName("task 0"); //consume input1 and input2 - TaskName t1 = new TaskName("task 1"); //consume input1 and input2 and input 3 - TaskName t2 = new TaskName("task 2"); //consume input2 and input 3 - inputToTasks.put(input1, t0.getTaskName()); - inputToTasks.put(input1, t1.getTaskName()); - inputToTasks.put(input2, t0.getTaskName()); - inputToTasks.put(input2, t1.getTaskName()); - inputToTasks.put(input2, t2.getTaskName()); - inputToTasks.put(input3, t1.getTaskName()); - inputToTasks.put(input3, t2.getTaskName()); - - StreamSpec inputSpec2 = new StreamSpec("input-stream-2", "input-stream-2", "test-system"); - StreamSpec inputSpec3 = new StreamSpec("input-stream-3", "input-stream-3", "test-system"); - StreamSpec intSpec1 = new StreamSpec("int-stream-1", "int-stream-1", "test-system"); - StreamSpec intSpec2 = new StreamSpec("int-stream-2", "int-stream-2", "test-system"); - - List<IOGraph.IONode> nodes = new ArrayList<>(); - IOGraph.IONode node = new IOGraph.IONode(intSpec1, true); - node.addInput(inputSpec2); - nodes.add(node); - node = new IOGraph.IONode(intSpec2, true); - node.addInput(inputSpec3); - nodes.add(node); - IOGraph ioGraph = new IOGraph(nodes); - - Map<SystemStream, Integer> counts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, ioGraph); - assertEquals(counts.get(intSpec1.toSystemStream()).intValue(), 3); - assertEquals(counts.get(intSpec2.toSystemStream()).intValue(), 2); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java deleted file mode 100644 index cc70b6b..0000000 --- a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.control; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.samza.Partition; -import org.apache.samza.container.TaskName; -import org.apache.samza.message.EndOfStreamMessage; -import org.apache.samza.operators.spec.OperatorSpecs; -import org.apache.samza.operators.spec.OutputOperatorSpec; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamMetadata; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -public class TestEndOfStreamManager { - StreamMetadataCache metadataCache; - - @Before - public void setup() { - SystemStreamMetadata metadata = mock(SystemStreamMetadata.class); - Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); - partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)); - when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata); - metadataCache = mock(StreamMetadataCache.class); - when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata); - } - - @Test - public void testUpdateFromInputSource() { - SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); - TaskName taskName = new TaskName("Task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); - EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null); - manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), mock(TaskCoordinator.class)); - assertTrue(manager.isEndOfStream(ssp.getSystemStream())); - } - - @Test - public void testUpdateFromIntermediateStream() { - SystemStreamPartition[] ssps = new SystemStreamPartition[3]; - ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); - ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); - ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); - - TaskName taskName = new TaskName("Task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - for (SystemStreamPartition ssp : ssps) { - streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); - } - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList())); - EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null); - - int envelopeCount = 4; - IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount]; - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new EndOfStreamMessage("task " + i, envelopeCount)); - } - TaskCoordinator coordinator = mock(TaskCoordinator.class); - - // verify the first three messages won't result in end-of-stream - for (int i = 0; i < 3; i++) { - manager.update(envelopes[i], coordinator); - assertFalse(manager.isEndOfStream(ssps[0].getSystemStream())); - } - // the fourth message will end the stream - manager.update(envelopes[3], coordinator); - assertTrue(manager.isEndOfStream(ssps[0].getSystemStream())); - assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); - - // stream2 has two partitions assigned to this task, so it requires a message from each partition to end it - envelopes = new IncomingMessageEnvelope[envelopeCount]; - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount)); - } - // verify the messages for the partition 0 won't result in end-of-stream - for (int i = 0; i < 4; i++) { - manager.update(envelopes[i], coordinator); - assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); - } - for (int i = 0; i < envelopeCount; i++) { - envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount)); - } - for (int i = 0; i < 3; i++) { - manager.update(envelopes[i], coordinator); - assertFalse(manager.isEndOfStream(ssps[1].getSystemStream())); - } - // the fourth message will end the stream - manager.update(envelopes[3], coordinator); - assertTrue(manager.isEndOfStream(ssps[1].getSystemStream())); - } - - @Test - public void testUpdateFromIntermediateStreamWith2Tasks() { - SystemStreamPartition[] ssps0 = new SystemStreamPartition[2]; - ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0)); - ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0)); - - SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1)); - - TaskName t0 = new TaskName("Task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - for (SystemStreamPartition ssp : ssps0) { - streamToTasks.put(ssp.getSystemStream(), t0.getTaskName()); - } - - TaskName t1 = new TaskName("Task 1"); - streamToTasks.put(ssp1, t1.getTaskName()); - - List<StreamSpec> inputs = new ArrayList<>(); - inputs.add(new StreamSpec("test-stream-1", "test-stream-1", "test-system")); - inputs.add(new StreamSpec("test-stream-2", "test-stream-2", "test-system")); - StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); - IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); - - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(ioGraph); - - EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null)); - manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), mock(TaskCoordinator.class)); - assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream())); - doNothing().when(manager0).sendEndOfStream(any(), anyInt()); - manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), mock(TaskCoordinator.class)); - assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream())); - verify(manager0).sendEndOfStream(any(), anyInt()); - - EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", listener, streamToTasks, Collections.singleton( - ssp1), null, null)); - doNothing().when(manager1).sendEndOfStream(any(), anyInt()); - manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), mock(TaskCoordinator.class)); - assertTrue(manager1.isEndOfStream(ssp1.getSystemStream())); - verify(manager1).sendEndOfStream(any(), anyInt()); - } - - @Test - public void testSendEndOfStream() { - StreamSpec ints = new StreamSpec("int-stream", "int-stream", "test-system"); - StreamSpec input = new StreamSpec("input-stream", "input-stream", "test-system"); - IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true); - - Multimap<SystemStream, String> inputToTasks = HashMultimap.create(); - for (int i = 0; i < 8; i++) { - inputToTasks.put(input.toSystemStream(), "Task " + i); - } - - MessageCollector collector = mock(MessageCollector.class); - TaskName taskName = new TaskName("Task 0"); - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(ioGraph); - EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(), - listener, - inputToTasks, - Collections.EMPTY_SET, - metadataCache, - collector); - - Set<Integer> partitions = new HashSet<>(); - doAnswer(invocation -> { - OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0]; - partitions.add((Integer) envelope.getPartitionKey()); - EndOfStreamMessage eosMessage = (EndOfStreamMessage) envelope.getMessage(); - assertEquals(eosMessage.getTaskName(), taskName.getTaskName()); - assertEquals(eosMessage.getTaskCount(), 8); - return null; - }).when(collector).send(any()); - - manager.sendEndOfStream(input.toSystemStream(), 8); - assertEquals(partitions.size(), 4); - } - - @Test - public void testPropagate() { - List<StreamSpec> inputs = new ArrayList<>(); - inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); - inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system")); - StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); - - SystemStream input1 = new SystemStream("test-system", "input-stream-1"); - SystemStream input2 = new SystemStream("test-system", "input-stream-2"); - SystemStream ints = new SystemStream("test-system", "int-stream"); - SystemStreamPartition[] ssps = new SystemStreamPartition[3]; - ssps[0] = new SystemStreamPartition(input1, new Partition(0)); - ssps[1] = new SystemStreamPartition(input2, new Partition(0)); - ssps[2] = new SystemStreamPartition(ints, new Partition(0)); - - Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps)); - TaskName taskName = new TaskName("task 0"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - for (SystemStreamPartition ssp : ssps) { - streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName()); - } - - IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); - MessageCollector collector = mock(MessageCollector.class); - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(ioGraph); - EndOfStreamManager manager = spy( - new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, metadataCache, collector)); - TaskCoordinator coordinator = mock(TaskCoordinator.class); - - // ssp1 end-of-stream, wait for ssp2 - manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), coordinator); - verify(manager, never()).sendEndOfStream(any(), anyInt()); - - // ssp2 end-of-stream, propagate to intermediate - manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), coordinator); - doNothing().when(manager).sendEndOfStream(any(), anyInt()); - ArgumentCaptor<SystemStream> argument = ArgumentCaptor.forClass(SystemStream.class); - verify(manager).sendEndOfStream(argument.capture(), anyInt()); - assertEquals(ints, argument.getValue()); - - // intermediate end-of-stream, shutdown the task - manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), coordinator); - doNothing().when(coordinator).shutdown(any()); - ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); - verify(coordinator).shutdown(arg.capture()); - assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); - } - - // Test the case when the publishing tasks to intermediate stream is a subset of total tasks - @Test - public void testPropogateWith2Tasks() { - StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system"); - OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, null); - OutputOperatorSpec partitionByOp = OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0); - - List<StreamSpec> inputs = new ArrayList<>(); - inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system")); - - IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true); - - SystemStream input1 = new SystemStream("test-system", "input-stream-1"); - SystemStream ints = new SystemStream("test-system", "int-stream"); - SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new Partition(0)); - SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new Partition(0)); - - TaskName t0 = new TaskName("task 0"); - TaskName t1 = new TaskName("task 1"); - Multimap<SystemStream, String> streamToTasks = HashMultimap.create(); - streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName()); - streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName()); - - ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class); - when(listener.getIOGraph()).thenReturn(ioGraph); - - EndOfStreamManager manager0 = spy( - new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, Collections.singleton(ssp1), metadataCache, null)); - EndOfStreamManager manager1 = spy( - new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, Collections.singleton(ssp2), metadataCache, null)); - - TaskCoordinator coordinator0 = mock(TaskCoordinator.class); - TaskCoordinator coordinator1 = mock(TaskCoordinator.class); - - // ssp1 end-of-stream - doNothing().when(manager0).sendEndOfStream(any(), anyInt()); - doNothing().when(coordinator0).shutdown(any()); - manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), coordinator0); - //verify task count is 1 - ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class); - verify(manager0).sendEndOfStream(any(), argument.capture()); - assertTrue(argument.getValue() == 1); - ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); - verify(coordinator0).shutdown(arg.capture()); - assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); - - // int1 end-of-stream - IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, null, new EndOfStreamMessage(t0.getTaskName(), 1)); - manager1.update(intEos, coordinator1); - doNothing().when(coordinator1).shutdown(any()); - verify(manager1, never()).sendEndOfStream(any(), anyInt()); - arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class); - verify(coordinator1).shutdown(arg.capture()); - assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java deleted file mode 100644 index 39c56c3..0000000 --- a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.control; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.control.IOGraph.IONode; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.system.StreamSpec; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestIOGraph { - StreamSpec input1; - StreamSpec input2; - StreamSpec input3; - StreamSpec output1; - StreamSpec output2; - StreamSpec int1; - StreamSpec int2; - - StreamGraphImpl streamGraph; - - @Before - public void setup() { - ApplicationRunner runner = mock(ApplicationRunner.class); - Map<String, String> configMap = new HashMap<>(); - configMap.put(JobConfig.JOB_NAME(), "test-app"); - configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); - Config config = new MapConfig(configMap); - - /** - * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. - * - * input1 -> map -> join -> output1 - * | - * input2 -> partitionBy -> filter -| - * | - * input3 -> filter -> partitionBy -> map -> join -> output2 - * - */ - input1 = new StreamSpec("input1", "input1", "system1"); - input2 = new StreamSpec("input2", "input2", "system2"); - input3 = new StreamSpec("input3", "input3", "system2"); - - output1 = new StreamSpec("output1", "output1", "system1"); - output2 = new StreamSpec("output2", "output2", "system2"); - - runner = mock(ApplicationRunner.class); - when(runner.getStreamSpec("input1")).thenReturn(input1); - when(runner.getStreamSpec("input2")).thenReturn(input2); - when(runner.getStreamSpec("input3")).thenReturn(input3); - when(runner.getStreamSpec("output1")).thenReturn(output1); - when(runner.getStreamSpec("output2")).thenReturn(output2); - - // intermediate streams used in tests - int1 = new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"); - int2 = new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"); - when(runner.getStreamSpec("test-app-1-partition_by-3")) - .thenReturn(int1); - when(runner.getStreamSpec("test-app-1-partition_by-8")) - .thenReturn(int2); - - streamGraph = new StreamGraphImpl(runner, config); - BiFunction msgBuilder = mock(BiFunction.class); - MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn); - - m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1); - m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2); - } - - @Test - public void testBuildIOGraph() { - IOGraph ioGraph = streamGraph.toIOGraph(); - assertEquals(ioGraph.getNodes().size(), 4); - - for (IONode node : ioGraph.getNodes()) { - if (node.getOutput().equals(output1)) { - assertEquals(node.getInputs().size(), 2); - assertFalse(node.isOutputIntermediate()); - StreamSpec[] inputs = sort(node.getInputs()); - assertEquals(inputs[0], input1); - assertEquals(inputs[1], int1); - } else if (node.getOutput().equals(output2)) { - assertEquals(node.getInputs().size(), 2); - assertFalse(node.isOutputIntermediate()); - StreamSpec[] inputs = sort(node.getInputs()); - assertEquals(inputs[0], int1); - assertEquals(inputs[1], int2); - } else if (node.getOutput().equals(int1)) { - assertEquals(node.getInputs().size(), 1); - assertTrue(node.isOutputIntermediate()); - StreamSpec[] inputs = sort(node.getInputs()); - assertEquals(inputs[0], input2); - } else if (node.getOutput().equals(int2)) { - assertEquals(node.getInputs().size(), 1); - assertTrue(node.isOutputIntermediate()); - StreamSpec[] inputs = sort(node.getInputs()); - assertEquals(inputs[0], input3); - } - } - } - - @Test - public void testNodesOfInput() { - IOGraph ioGraph = streamGraph.toIOGraph(); - Collection<IONode> nodes = ioGraph.getNodesOfInput(input1.toSystemStream()); - assertEquals(nodes.size(), 1); - IONode node = nodes.iterator().next(); - assertEquals(node.getOutput(), output1); - assertEquals(node.getInputs().size(), 2); - assertFalse(node.isOutputIntermediate()); - - nodes = ioGraph.getNodesOfInput(input2.toSystemStream()); - assertEquals(nodes.size(), 1); - node = nodes.iterator().next(); - assertEquals(node.getOutput(), int1); - assertEquals(node.getInputs().size(), 1); - assertTrue(node.isOutputIntermediate()); - - nodes = ioGraph.getNodesOfInput(int1.toSystemStream()); - assertEquals(nodes.size(), 2); - nodes.forEach(n -> { - assertEquals(n.getInputs().size(), 2); - }); - - nodes = ioGraph.getNodesOfInput(input3.toSystemStream()); - assertEquals(nodes.size(), 1); - node = nodes.iterator().next(); - assertEquals(node.getOutput(), int2); - assertEquals(node.getInputs().size(), 1); - assertTrue(node.isOutputIntermediate()); - - nodes = ioGraph.getNodesOfInput(int2.toSystemStream()); - assertEquals(nodes.size(), 1); - node = nodes.iterator().next(); - assertEquals(node.getOutput(), output2); - assertEquals(node.getInputs().size(), 2); - assertFalse(node.isOutputIntermediate()); - } - - private static StreamSpec[] sort(Set<StreamSpec> specs) { - StreamSpec[] array = new StreamSpec[specs.size()]; - specs.toArray(array); - Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId())); - return array; - } - - public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs, - StreamSpec output, - boolean isOutputIntermediate) { - IONode node = new IONode(output, isOutputIntermediate); - inputs.forEach(input -> node.addInput(input)); - return new IOGraph(Collections.singleton(node)); - } -}
