Merge branch 'master' into 0.14.0
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/052a0570 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/052a0570 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/052a0570 Branch: refs/heads/master Commit: 052a0570cf0f1a1020faef7c3695c0e86c6f348c Parents: a1f0144 f16ba26 Author: Xinyu Liu <[email protected]> Authored: Tue Oct 3 15:09:41 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Tue Oct 3 15:09:41 2017 -0700 ---------------------------------------------------------------------- build.gradle | 12 +- .../versioned/jobs/configuration-table.html | 27 +- .../java/org/apache/samza/operators/KV.java | 48 +++ .../apache/samza/operators/MessageStream.java | 34 +- .../apache/samza/operators/OutputStream.java | 6 +- .../org/apache/samza/operators/StreamGraph.java | 67 ++- .../org/apache/samza/serializers/Serde.java | 6 +- .../samza/system/SystemProducerException.java | 38 ++ .../ByteBufferSerde.scala | 48 +++ .../ByteSerde.scala | 36 ++ .../DoubleSerde.scala | 45 ++ .../IntegerSerde.scala | 45 ++ .../JsonSerdeV2.scala | 91 ++++ .../org.apache.samza.serializers/KVSerde.scala | 55 +++ .../LongSerde.scala | 45 ++ .../NoOpSerde.scala | 37 ++ .../SerializableSerde.scala | 67 +++ .../StringSerde.scala | 49 +++ .../UUIDSerde.scala | 47 ++ .../TestByteBufferSerde.scala | 53 +++ .../TestByteSerde.scala | 38 ++ .../TestDoubleSerde.scala | 40 ++ .../TestIntegerSerde.scala | 37 ++ .../TestJsonSerdeV2.scala | 45 ++ .../TestLongSerde.scala | 40 ++ .../TestSerializableSerde.scala | 45 ++ .../TestStringSerde.scala | 37 ++ .../TestUUIDSerde.scala | 53 +++ .../apache/samza/config/JavaSystemConfig.java | 14 +- .../samza/config/JobCoordinatorConfig.java | 9 + .../coordinator/CoordinationUtilsFactory.java | 10 - .../org/apache/samza/execution/JobNode.java | 92 +++- .../samza/operators/MessageStreamImpl.java | 25 +- .../apache/samza/operators/StreamGraphImpl.java | 127 +++--- .../samza/operators/impl/InputOperatorImpl.java | 16 +- .../samza/operators/impl/OperatorImpl.java | 36 +- .../samza/operators/impl/OperatorImplGraph.java | 27 +- .../operators/impl/OutputOperatorImpl.java | 58 +-- .../operators/impl/PartitionByOperatorImpl.java | 108 +++++ .../operators/impl/WindowOperatorImpl.java | 3 +- .../samza/operators/spec/InputOperatorSpec.java | 37 +- .../samza/operators/spec/OperatorSpec.java | 20 +- .../samza/operators/spec/OperatorSpecs.java | 26 +- .../operators/spec/OutputOperatorSpec.java | 11 +- .../samza/operators/spec/OutputStreamImpl.java | 28 +- .../operators/spec/PartitionByOperatorSpec.java | 81 ++++ .../stream/IntermediateMessageStreamImpl.java | 17 +- .../samza/runtime/LocalApplicationRunner.java | 5 +- .../serializers/IntermediateMessageSerde.java | 39 +- .../apache/samza/task/StreamOperatorTask.java | 4 +- .../apache/samza/config/SerializerConfig.scala | 6 +- .../org/apache/samza/config/SystemConfig.scala | 16 +- .../org/apache/samza/config/TaskConfig.scala | 3 + .../apache/samza/container/SamzaContainer.scala | 59 ++- .../samza/serializers/ByteBufferSerde.scala | 48 --- .../apache/samza/serializers/ByteSerde.scala | 36 -- .../apache/samza/serializers/DoubleSerde.scala | 45 -- .../apache/samza/serializers/IntegerSerde.scala | 45 -- .../apache/samza/serializers/JsonSerde.scala | 32 +- .../apache/samza/serializers/LongSerde.scala | 45 -- .../serializers/MetricsSnapshotSerde.scala | 4 +- .../samza/serializers/SerializableSerde.scala | 67 --- .../apache/samza/serializers/StringSerde.scala | 44 -- .../apache/samza/serializers/UUIDSerde.scala | 47 -- .../main/scala/org/apache/samza/util/Util.scala | 15 +- .../samza/config/TestJavaSystemConfig.java | 35 +- .../apache/samza/example/BroadcastExample.java | 18 +- .../samza/example/KeyValueStoreExample.java | 26 +- .../org/apache/samza/example/MergeExample.java | 20 +- .../samza/example/OrderShipmentJoinExample.java | 28 +- .../samza/example/PageViewCounterExample.java | 13 +- .../samza/example/RepartitionExample.java | 22 +- .../org/apache/samza/example/WindowExample.java | 8 +- .../samza/execution/TestExecutionPlanner.java | 59 ++- .../execution/TestJobGraphJsonGenerator.java | 35 +- .../org/apache/samza/execution/TestJobNode.java | 112 +++++ .../samza/operators/TestJoinOperator.java | 21 +- .../samza/operators/TestMessageStreamImpl.java | 69 ++- .../samza/operators/TestStreamGraphImpl.java | 407 ++++++++++++++++-- .../samza/operators/TestWindowOperator.java | 17 +- .../operators/impl/TestOperatorImplGraph.java | 88 +++- .../operators/impl/TestStreamOperatorImpl.java | 4 +- .../runtime/TestLocalApplicationRunner.java | 22 +- .../apache/samza/config/TestSystemConfig.scala | 67 +++ .../samza/container/TestTaskInstance.scala | 35 ++ .../samza/serializers/TestByteBufferSerde.scala | 53 --- .../samza/serializers/TestByteSerde.scala | 38 -- .../samza/serializers/TestDoubleSerde.scala | 40 -- .../samza/serializers/TestIntegerSerde.scala | 37 -- .../samza/serializers/TestLongSerde.scala | 40 -- .../serializers/TestSerializableSerde.scala | 45 -- .../samza/serializers/TestStringSerde.scala | 37 -- .../samza/serializers/TestUUIDSerde.scala | 53 --- .../scala/org/apache/samza/util/TestUtil.scala | 13 + .../org/apache/samza/config/KafkaConfig.scala | 34 +- .../samza/system/kafka/KafkaSystemFactory.scala | 5 +- .../system/kafka/KafkaSystemProducer.scala | 256 ++++++----- .../samza/system/kafka/MockKafkaProducer.java | 87 ++-- .../kafka/TestKafkaSystemProducerJava.java | 2 +- .../apache/samza/config/TestKafkaConfig.scala | 24 +- .../system/kafka/TestKafkaSystemProducer.scala | 427 ++++++++++++++++--- .../storage/kv/TestKeyValueStorageEngine.scala | 14 +- .../apache/samza/config/Log4jSystemConfig.java | 2 +- .../samza/logging/log4j/StreamAppender.java | 2 +- samza-test/src/main/resources/log4j.xml | 44 +- .../EndOfStreamIntegrationTest.java | 15 +- .../WatermarkIntegrationTest.java | 8 +- .../apache/samza/test/operator/PageView.java | 37 +- .../test/operator/RepartitionWindowApp.java | 28 +- .../samza/test/operator/SessionWindowApp.java | 23 +- .../test/operator/TestRepartitionWindowApp.java | 20 +- .../samza/test/operator/TumblingWindowApp.java | 24 +- .../test/processor/IdentityStreamTask.java | 16 +- .../test/processor/TestStreamProcessor.java | 149 ++++--- .../processor/TestZkLocalApplicationRunner.java | 36 +- .../test/integration/StreamTaskTestUtil.scala | 1 + .../test/integration/TestStatefulTask.scala | 2 +- 117 files changed, 3580 insertions(+), 1634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/execution/JobNode.java index a86e019,fea42f2..7ff43ed --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@@ -123,8 -132,11 +132,11 @@@ public class JobNode configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); // write input/output streams to configs - inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs)); + inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig())); + // write serialized serde instances and stream serde configs to configs + addSerdeConfigs(configs); + log.info("Job {} has generated configs {}", jobName, configs); String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index eefd4eb,e353ac4..0c50630 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@@ -18,16 -18,9 +18,13 @@@ */ package org.apache.samza.operators.impl; - import java.util.Collection; - import java.util.Collections; - import java.util.HashSet; - import java.util.Set; + import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MetricsConfig; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; @@@ -39,14 -29,20 +36,23 @@@ import org.apache.samza.task.MessageCol import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.HighResolutionClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.Set; + + /** * Abstract base class for all stream operator implementations. + * + * @param <M> type of the input to this operator + * @param <RM> type of the results of applying this operator */ public abstract class OperatorImpl<M, RM> { + private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class); private static final String METRICS_GROUP = OperatorImpl.class.getName(); private boolean initialized; @@@ -151,14 -133,9 +169,16 @@@ long endNs = this.highResClock.nanoTime(); this.handleMessageNs.update(endNs - startNs); - results.forEach(rm -> this.registeredOperators.forEach(op -> op.onMessage(rm, collector, coordinator))); + results.forEach(rm -> + this.registeredOperators.forEach(op -> - op.onMessage(rm, collector, coordinator))); ++ op.onMessage(rm, collector, coordinator))); + + WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn(); + if (watermarkFn != null) { + // check whether there is new watermark emitted from the user function + Long outputWm = watermarkFn.getOutputWatermark(); + propagateWatermark(outputWm, collector, coordinator); + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 9b747bc,faedfc9..1f86975 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@@ -18,14 -18,9 +18,14 @@@ */ package org.apache.samza.operators.impl; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import java.util.stream.Collectors; - import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.job.model.JobModel; + import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; @@@ -272,70 -249,4 +275,68 @@@ public class OperatorImplGraph } }; } + + private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) { + return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet()); + } + + /** + * calculate the task count that produces to each intermediate streams + * @param streamToConsumerTasks input streams to task mapping + * @param intermediateToInputStreams intermediate stream to input streams mapping + * @return mapping from intermediate stream to task count + */ + static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams( + Multimap<SystemStream, String> streamToConsumerTasks, + Multimap<SystemStream, SystemStream> intermediateToInputStreams) { + Map<SystemStream, Integer> result = new HashMap<>(); + intermediateToInputStreams.asMap().entrySet().forEach(entry -> { + result.put(entry.getKey(), + entry.getValue().stream() + .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream()) + .collect(Collectors.toSet()).size()); + }); + return result; + } + + /** + * calculate the mapping from input streams to consumer tasks + * @param jobModel JobModel object + * @return mapping from input stream to tasks + */ + static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) { + Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create(); + jobModel.getContainers().values().forEach(containerModel -> { + containerModel.getTasks().values().forEach(taskModel -> { + taskModel.getSystemStreamPartitions().forEach(ssp -> { + streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName()); + }); + }); + }); + return streamToConsumerTasks; + } + + /** + * calculate the mapping from output streams to input streams + * @param streamGraph the user {@link StreamGraphImpl} instance + * @return mapping from output streams to input streams + */ + static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) { + Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create(); + streamGraph.getInputOperators().entrySet().stream() + .forEach( + entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams)); + return outputToInputStreams; + } + + private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec, + Multimap<SystemStream, SystemStream> outputToInputStreams) { - if (opSpec instanceof OutputOperatorSpec) { - OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec; - if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) { - outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input); - } ++ if (opSpec instanceof PartitionByOperatorSpec) { ++ PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec; ++ outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input); + } else { + Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs(); + nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams)); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java index 0000000,072b31d..28b8dba mode 000000,100644..100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java @@@ -1,0 -1,82 +1,108 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.operators.impl; + + import org.apache.samza.SamzaException; + import org.apache.samza.config.Config; ++import org.apache.samza.container.TaskContextImpl; + import org.apache.samza.operators.KV; + import org.apache.samza.operators.spec.OperatorSpec; + import org.apache.samza.operators.spec.OutputStreamImpl; + import org.apache.samza.operators.spec.PartitionByOperatorSpec; ++import org.apache.samza.system.ControlMessage; ++import org.apache.samza.system.EndOfStreamMessage; + import org.apache.samza.system.OutgoingMessageEnvelope; ++import org.apache.samza.system.StreamMetadataCache; + import org.apache.samza.system.SystemStream; ++import org.apache.samza.system.WatermarkMessage; + import org.apache.samza.task.MessageCollector; + import org.apache.samza.task.TaskContext; + import org.apache.samza.task.TaskCoordinator; + + import java.util.Collection; + import java.util.Collections; + import java.util.function.Function; + + + /** + * An operator that sends sends messages to an output {@link SystemStream} for repartitioning them. + */ + class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { + + private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec; + private final SystemStream systemStream; + private final Function<? super M, ? extends K> keyFunction; + private final Function<? super M, ? extends V> valueFunction; ++ private final String taskName; ++ private final ControlMessageSender controlMessageSender; + + PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) { + this.partitionByOpSpec = partitionByOpSpec; + OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream(); + if (!outputStream.isKeyedOutput()) { + throw new SamzaException("Output stream for repartitioning must be a keyed stream."); + } + this.systemStream = new SystemStream( + outputStream.getStreamSpec().getSystemName(), + outputStream.getStreamSpec().getPhysicalName()); + this.keyFunction = partitionByOpSpec.getKeyFunction(); + this.valueFunction = partitionByOpSpec.getValueFunction(); ++ this.taskName = context.getTaskName().getTaskName(); ++ StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache(); ++ this.controlMessageSender = new ControlMessageSender(streamMetadataCache); + } + + @Override + protected void handleInit(Config config, TaskContext context) { + } + + @Override + public Collection<Void> handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator) { + K key = keyFunction.apply(message); + V value = valueFunction.apply(message); + collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value)); + return Collections.emptyList(); + } + + @Override + protected void handleClose() { + } + + @Override + protected OperatorSpec<M, Void> getOperatorSpec() { + return partitionByOpSpec; + } ++ ++ @Override ++ protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { ++ sendControlMessage(new EndOfStreamMessage(taskName), collector); ++ } ++ ++ @Override ++ protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) { ++ sendControlMessage(new WatermarkMessage(watermark, taskName), collector); ++ return watermark; ++ } ++ ++ private void sendControlMessage(ControlMessage message, MessageCollector collector) { ++ SystemStream outputStream = partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream(); ++ controlMessageSender.send(message, outputStream, collector); ++ } + } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 773f742,2749245..3c66ee6 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@@ -18,12 -18,10 +18,11 @@@ */ package org.apache.samza.operators.spec; - import org.apache.commons.lang3.tuple.Pair; + import org.apache.samza.operators.KV; + import org.apache.samza.serializers.Serde; +import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.StreamSpec; - import java.util.function.BiFunction; - /** * The spec for an operator that receives incoming messages from an input stream * and converts them to the input message. @@@ -47,12 -49,15 +50,20 @@@ public class InputOperatorSpec<K, V> ex return this.streamSpec; } - public BiFunction<K, V, M> getMsgBuilder() { - return this.msgBuilder; + public Serde<K> getKeySerde() { + return keySerde; + } + + public Serde<V> getValueSerde() { + return valueSerde; + } + + public boolean isKeyedInput() { + return isKeyedInput; } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; - } ++ } } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 4047d92,bcb0485..71a9897 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@@ -19,7 -19,8 +19,9 @@@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.WatermarkFunction; + import org.apache.samza.operators.MessageStream; + import org.apache.samza.operators.MessageStreamImpl; import java.util.Collection; import java.util.LinkedHashSet; http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index 9759392,fc88634..862370f --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@@ -51,12 -46,7 +48,12 @@@ public class OutputOperatorSpec<M> exte * The {@link OutputStreamImpl} that this operator is sending its output to. * @return the {@link OutputStreamImpl} for this operator if any, else null. */ - public OutputStreamImpl<?, ?, M> getOutputStream() { + public OutputStreamImpl<M> getOutputStream() { return this.outputStream; } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index 0000000,a2bb5f2..42eeb4b mode 000000,100644..100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@@ -1,0 -1,76 +1,81 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.operators.spec; + + import org.apache.samza.operators.KV; ++import org.apache.samza.operators.functions.WatermarkFunction; + + import java.util.function.Function; + + + /** + * The spec for an operator that re-partitions a {@link org.apache.samza.operators.MessageStream} to a + * {@link org.apache.samza.system.SystemStream}. This is usually paired with a corresponding + * {@link InputOperatorSpec} that consumes the {@link org.apache.samza.system.SystemStream} again. + * <p> + * This is a terminal operator and does not allow further operator chaining. + * + * @param <M> the type of message + * @param <K> the type of key in the message + * @param <V> the type of value in the message + */ + public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> { + + private final OutputStreamImpl<KV<K, V>> outputStream; + private final Function<? super M, ? extends K> keyFunction; + private final Function<? super M, ? extends V> valueFunction; + + /** + * Constructs an {@link PartitionByOperatorSpec} to send messages to the provided {@code outputStream} + * + * @param outputStream the {@link OutputStreamImpl} to send messages to + * @param keyFunction the {@link Function} for extracting the key from the message + * @param valueFunction the {@link Function} for extracting the value from the message + * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph + */ + PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream, + Function<? super M, ? extends K> keyFunction, + Function<? super M, ? extends V> valueFunction, int opId) { + super(OpCode.PARTITION_BY, opId); + this.outputStream = outputStream; + this.keyFunction = keyFunction; + this.valueFunction = valueFunction; + } + + /** + * The {@link OutputStreamImpl} that this operator is sending its output to. + * @return the {@link OutputStreamImpl} for this operator if any, else null. + */ + public OutputStreamImpl<KV<K, V>> getOutputStream() { + return this.outputStream; + } + + public Function<? super M, ? extends K> getKeyFunction() { + return keyFunction; + } + + public Function<? super M, ? extends V> getValueFunction() { + return valueFunction; + } + ++ @Override ++ public WatermarkFunction getWatermarkFn() { ++ return null; ++ } + } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java index 2ed559f,45ce9aa..61735ad --- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java @@@ -20,11 -20,13 +20,13 @@@ package org.apache.samza.serializers; import java.util.Arrays; + import org.apache.samza.SamzaException; -import org.apache.samza.message.EndOfStreamMessage; -import org.apache.samza.message.MessageType; -import org.apache.samza.message.WatermarkMessage; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.MessageType; +import org.apache.samza.system.WatermarkMessage; - import org.codehaus.jackson.type.TypeReference; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; /** http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --cc samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 0074e24,d7c2742..87af392 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@@ -18,12 -18,10 +18,12 @@@ */ package org.apache.samza.task; - import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.system.EndOfStreamMessage; +import org.apache.samza.system.MessageType; import org.apache.samza.operators.ContextManager; + import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; @@@ -111,21 -105,7 +111,21 @@@ public final class StreamOperatorTask i SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream(); InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream); if (inputOpImpl != null) { - inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator); + switch (MessageType.of(ime.getMessage())) { + case USER_MESSAGE: - inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator); ++ inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator); + break; + + case END_OF_STREAM: + EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage(); + inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator); + break; + + case WATERMARK: + WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage(); + inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator); + break; + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --cc samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java index 0000000,c59c0cc..918da26 mode 000000,100644..100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@@ -1,0 -1,111 +1,112 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.samza.execution; + + import org.apache.samza.config.Config; + import org.apache.samza.config.MapConfig; + import org.apache.samza.config.SerializerConfig; + import org.apache.samza.operators.KV; + import org.apache.samza.operators.MessageStream; + import org.apache.samza.operators.OutputStream; + import org.apache.samza.operators.StreamGraphImpl; + import org.apache.samza.runtime.ApplicationRunner; + import org.apache.samza.serializers.JsonSerdeV2; + import org.apache.samza.serializers.KVSerde; + import org.apache.samza.serializers.Serde; + import org.apache.samza.serializers.SerializableSerde; + import org.apache.samza.serializers.StringSerde; + import org.apache.samza.system.StreamSpec; + import org.junit.Test; + + import java.util.Base64; + import java.util.HashMap; + import java.util.Map; + import java.util.stream.Collectors; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + import static org.mockito.Mockito.doReturn; + import static org.mockito.Mockito.mock; + + public class TestJobNode { + + @Test + public void testAddSerdeConfigs() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec inputSpec = new StreamSpec("input", "input", "input-system"); + StreamSpec outputSpec = new StreamSpec("output", "output", "output-system"); + StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system"); + doReturn(inputSpec).when(mockRunner).getStreamSpec("input"); + doReturn(outputSpec).when(mockRunner).getStreamSpec("output"); + doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1"); + + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); + MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input"); + OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output"); + input.partitionBy(KV::getKey, KV::getValue).sendTo(output); + + JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class)); - StreamEdge inputEdge = new StreamEdge(inputSpec); - StreamEdge outputEdge = new StreamEdge(outputSpec); - StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true); ++ Config config = new MapConfig(); ++ StreamEdge inputEdge = new StreamEdge(inputSpec, config); ++ StreamEdge outputEdge = new StreamEdge(outputSpec, config); ++ StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config); + jobNode.addInEdge(inputEdge); + jobNode.addOutEdge(outputEdge); + jobNode.addInEdge(repartitionEdge); + jobNode.addOutEdge(repartitionEdge); + + Map<String, String> configs = new HashMap<>(); + jobNode.addSerdeConfigs(configs); + + MapConfig mapConfig = new MapConfig(configs); + Config serializers = mapConfig.subset("serializers.registry.", true); + + // make sure that the serializers deserialize correctly + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap( + e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), + e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) + )); + assertEquals(2, serializers.size()); + + String inputKeySerde = mapConfig.get("streams.input.samza.key.serde"); + String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(inputKeySerde)); + assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(inputMsgSerde)); + assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String outputKeySerde = mapConfig.get("streams.output.samza.key.serde"); + String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(outputKeySerde)); + assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(outputMsgSerde)); + assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde"); + String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(partitionByKeySerde)); + assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde)); + assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + } + + } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --cc samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 9fab1b7,68b4ce0..d73c545 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@@ -19,24 -19,9 +19,24 @@@ package org.apache.samza.operators.impl; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; - import org.apache.commons.lang3.tuple.Pair; +import org.apache.samza.Partition; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskContextImpl; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; + import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; @@@ -45,9 -30,12 +45,14 @@@ import org.apache.samza.operators.funct import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.runtime.ApplicationRunner; + import org.apache.samza.serializers.IntegerSerde; + import org.apache.samza.serializers.KVSerde; + import org.apache.samza.serializers.NoOpSerde; ++import org.apache.samza.serializers.Serde; + import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@@ -121,6 -105,43 +123,47 @@@ public class TestOperatorImplGraph } @Test + public void testPartitionByChain() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); + when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system")); + when(mockRunner.getStreamSpec(eq("null-null-partition_by-1"))) + .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system")); + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + MessageStream<Object> inputStream = streamGraph.getInputStream("input"); + OutputStream<KV<Integer, String>> outputStream = streamGraph + .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + + inputStream + .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))) + .sendTo(outputStream); + - TaskContext mockTaskContext = mock(TaskContext.class); ++ TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); ++ when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); ++ JobModel jobModel = mock(JobModel.class); ++ when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP); ++ when(mockTaskContext.getJobModel()).thenReturn(jobModel); + OperatorImplGraph opImplGraph = + new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); + + InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); + assertEquals(1, inputOpImpl.registeredOperators.size()); + + OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next(); + assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator + assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode()); + + InputOperatorImpl repartitionedInputOpImpl = + opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream")); + assertEquals(1, repartitionedInputOpImpl.registeredOperators.size()); + + OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next(); + assertEquals(0, sendToOpImpl.registeredOperators.size()); + assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode()); + } + + @Test public void testBroadcastChain() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); @@@ -173,11 -194,11 +216,11 @@@ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); JoinFunction mockJoinFunction = mock(JoinFunction.class); - MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v); - MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v); + MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>()); + MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>()); inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1)); - TaskContext mockTaskContext = mock(TaskContext.class); + TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); @@@ -272,144 -293,4 +315,143 @@@ } }; } + + @Test + public void testGetStreamToConsumerTasks() { + String system = "test-system"; + String stream0 = "test-stream-0"; + String stream1 = "test-stream-1"; + + SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0)); + + TaskName task0 = new TaskName("Task 0"); + TaskName task1 = new TaskName("Task 1"); + Set<SystemStreamPartition> ssps = new HashSet<>(); + ssps.add(ssp0); + ssps.add(ssp2); + TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0)); + ContainerModel cm0 = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)); + TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)); + ContainerModel cm1 = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)); + + Map<String, ContainerModel> cms = new HashMap<>(); + cms.put(cm0.getProcessorId(), cm0); + cms.put(cm1.getProcessorId(), cm1); + + JobModel jobModel = new JobModel(new MapConfig(), cms, null); + Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel); + assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2); + assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1); + } + + @Test + public void testGetOutputToInputStreams() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(JobConfig.JOB_NAME(), "test-app"); + configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); + Config config = new MapConfig(configMap); + + /** + * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. + * + * input1 -> map -> join -> partitionBy (10) -> output1 + * | + * input2 -> filter -| + * | + * input3 -> filter -> partitionBy -> map -> join -> output2 + * + */ + StreamSpec input1 = new StreamSpec("input1", "input1", "system1"); + StreamSpec input2 = new StreamSpec("input2", "input2", "system2"); + StreamSpec input3 = new StreamSpec("input3", "input3", "system2"); + + StreamSpec output1 = new StreamSpec("output1", "output1", "system1"); + StreamSpec output2 = new StreamSpec("output2", "output2", "system2"); + + ApplicationRunner runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("input1")).thenReturn(input1); + when(runner.getStreamSpec("input2")).thenReturn(input2); + when(runner.getStreamSpec("input3")).thenReturn(input3); + when(runner.getStreamSpec("output1")).thenReturn(output1); + when(runner.getStreamSpec("output2")).thenReturn(output2); + + // intermediate streams used in tests + StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", "test-app-1-partition_by-10", "default-system"); + StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", "test-app-1-partition_by-6", "default-system"); + when(runner.getStreamSpec("test-app-1-partition_by-10")) + .thenReturn(int1); + when(runner.getStreamSpec("test-app-1-partition_by-6")) + .thenReturn(int2); + + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - BiFunction msgBuilder = mock(BiFunction.class); - MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn); - - m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha").sendTo(om1); ++ Serde inputSerde = new NoOpSerde<>(); ++ MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m -> m); ++ MessageStream m2 = streamGraph.getInputStream("input2", inputSerde).filter(m -> true); ++ MessageStream m3 = streamGraph.getInputStream("input3", inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m); ++ OutputStream<Object> om1 = streamGraph.getOutputStream("output1"); ++ OutputStream<Object> om2 = streamGraph.getOutputStream("output2"); ++ ++ m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha", m -> m).sendTo(om1); + m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2); + + Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph); + Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream()); + assertEquals(inputs.size(), 2); + assertTrue(inputs.contains(input1.toSystemStream())); + assertTrue(inputs.contains(input2.toSystemStream())); + + inputs = outputToInput.get(int2.toSystemStream()); + assertEquals(inputs.size(), 1); + assertEquals(inputs.iterator().next(), input3.toSystemStream()); + } + + @Test + public void testGetProducerTaskCountForIntermediateStreams() { + /** + * the task assignment looks like the following: + * + * input1 -----> task0, task1 -----> int1 + * ^ + * input2 ------> task1, task2--------| + * v + * input3 ------> task1 -----------> int2 + * + */ + + SystemStream input1 = new SystemStream("system1", "intput1"); + SystemStream input2 = new SystemStream("system2", "intput2"); + SystemStream input3 = new SystemStream("system2", "intput3"); + + SystemStream int1 = new SystemStream("system1", "int1"); + SystemStream int2 = new SystemStream("system1", "int2"); + + + String task0 = "Task 0"; + String task1 = "Task 1"; + String task2 = "Task 2"; + + Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create(); + streamToConsumerTasks.put(input1, task0); + streamToConsumerTasks.put(input1, task1); + streamToConsumerTasks.put(input2, task1); + streamToConsumerTasks.put(input2, task2); + streamToConsumerTasks.put(input3, task1); + streamToConsumerTasks.put(int1, task0); + streamToConsumerTasks.put(int1, task1); + streamToConsumerTasks.put(int2, task0); + + Multimap<SystemStream, SystemStream> intermediateToInputStreams = HashMultimap.create(); + intermediateToInputStreams.put(int1, input1); + intermediateToInputStreams.put(int1, input2); + + intermediateToInputStreams.put(int2, input2); + intermediateToInputStreams.put(int2, input3); + + Map<SystemStream, Integer> counts = OperatorImplGraph.getProducerTaskCountForIntermediateStreams( + streamToConsumerTasks, intermediateToInputStreams); + assertTrue(counts.get(int1) == 3); + assertTrue(counts.get(int2) == 2); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --cc samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index d2094b4,1816380..a23e513 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@@ -20,9 -20,12 +20,12 @@@ package org.apache.samza.runtime; import com.google.common.collect.ImmutableList; - import java.util.*; -import java.lang.reflect.Field; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; ++import java.util.Set; import java.util.stream.Collectors; - import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; @@@ -49,8 -55,6 +54,7 @@@ import static org.junit.Assert.assertNo import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.doReturn; - import static org.powermock.api.mockito.PowerMockito.mockStatic; @RunWith(PowerMockRunner.class) @@@ -72,20 -76,44 +76,21 @@@ public class TestLocalApplicationRunne StreamApplication app = mock(StreamApplication.class); doNothing().when(app).init(anyObject(), anyObject()); - ExecutionPlanner planner = mock(ExecutionPlanner.class); - Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner"); - plannerField.setAccessible(true); - plannerField.set(runner, planner); - StreamManager streamManager = mock(StreamManager.class); - Field streamManagerField = runner.getClass().getSuperclass().getDeclaredField("streamManager"); - streamManagerField.setAccessible(true); - streamManagerField.set(runner, streamManager); - ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class); + doReturn(streamManager).when(runner).getStreamManager(); - ExecutionPlan plan = new ExecutionPlan() { - @Override - public List<JobConfig> getJobConfigs() { - return Collections.emptyList(); - } - - @Override - public List<StreamSpec> getIntermediateStreams() { - return Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")); - } - - @Override - public String getPlanAsJson() - throws Exception { - return ""; - } - }; - when(planner.plan(anyObject())).thenReturn(plan); + ExecutionPlan plan = mock(ExecutionPlan.class); + when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"))); + when(plan.getPlanAsJson()).thenReturn(""); + doReturn(plan).when(runner).getExecutionPlan(any(), any()); - mockStatic(CoordinationUtilsFactory.class); CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class); - when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory); + JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class); + when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory); + PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig); - LocalApplicationRunner spy = spy(runner); try { - spy.run(app); + runner.run(app); } catch (Throwable t) { assertNotNull(t); //no jobs exception } http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --cc samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index 8493cf1,0000000..d2f0184 mode 100644,000000..100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@@ -1,103 -1,0 +1,112 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.controlmessages; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; ++import org.apache.samza.operators.KV; ++import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.test.controlmessages.TestData.PageView; +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.util.ArraySystemFactory; +import org.apache.samza.test.util.Base64Serializer; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +/** + * This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input. + * It verifies the pipeline will stop and the number of output messages should equal to the input. + */ +public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { + + private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"}; + + @Test + public void testPipeline() throws Exception { + Random random = new Random(); + int count = 10; + PageView[] pageviews = new PageView[count]; + for (int i = 0; i < count; i++) { + String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; + int memberId = random.nextInt(10); + pageviews[i] = new PageView(pagekey, memberId); + } + + int partitionCount = 4; + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews)); + configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); + + configs.put(JobConfig.JOB_NAME(), "test-eos-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory"); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + List<PageView> received = new ArrayList<>(); + final StreamApplication app = (streamGraph, cfg) -> { - streamGraph.getInputStream("PageView", (k, v) -> (PageView) v) - .partitionBy(PageView::getMemberId) ++ streamGraph.<KV<String, PageView>>getInputStream("PageView") ++ .map(Values.create()) ++ .partitionBy(pv -> pv.getMemberId(), pv -> pv) + .sink((m, collector, coordinator) -> { - received.add(m); ++ received.add(m.getValue()); + }); + }; + runner.run(app); + runner.waitForFinish(); + + assertEquals(received.size(), count * partitionCount); + } ++ ++ public static final class Values { ++ public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() { ++ return (M m) -> m.getValue(); ++ } ++ } +} http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --cc samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index d9202d3,0000000..7da0e77 mode 100644,000000..100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@@ -1,204 -1,0 +1,206 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.controlmessages; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.TaskInstance; +import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.metrics.MetricsRegistry; ++import org.apache.samza.operators.KV; +import org.apache.samza.operators.impl.InputOperatorImpl; +import org.apache.samza.operators.impl.OperatorImpl; +import org.apache.samza.operators.impl.OperatorImplGraph; +import org.apache.samza.operators.impl.TestOperatorImpl; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.processor.StreamProcessor; +import org.apache.samza.processor.TestStreamProcessorUtil; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.runtime.TestLocalApplicationRunner; +import org.apache.samza.serializers.IntegerSerdeFactory; +import org.apache.samza.serializers.StringSerdeFactory; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.AsyncStreamTaskAdapter; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TestStreamOperatorTask; +import org.apache.samza.test.controlmessages.TestData.PageView; +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.apache.samza.test.util.SimpleSystemAdmin; +import org.apache.samza.test.util.TestStreamConsumer; +import org.junit.Test; +import scala.collection.JavaConverters; + +import static org.junit.Assert.assertEquals; + + +public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { + + private static int offset = 1; + private static final String TEST_SYSTEM = "test"; + private static final String TEST_STREAM = "PageView"; + private static final int PARTITION_COUNT = 2; + private static final SystemStreamPartition SSP0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + private static final SystemStreamPartition SSP1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + + private final static List<IncomingMessageEnvelope> TEST_DATA = new ArrayList<>(); + static { + TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0)); + TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4)); + TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3)); + TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0)); + TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1)); + TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0)); + TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1)); + } + + public final static class TestSystemFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new TestStreamConsumer(TEST_DATA); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + return null; + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new SimpleSystemAdmin(config); + } + } + + private static IncomingMessageEnvelope createIncomingMessage(Object message, SystemStreamPartition ssp) { + return new IncomingMessageEnvelope(ssp, String.valueOf(offset++), "", message); + } + + @Test + public void testWatermark() throws Exception { + Map<String, String> configs = new HashMap<>(); + configs.put("systems.test.samza.factory", TestSystemFactory.class.getName()); + configs.put("streams.PageView.samza.system", "test"); + configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT)); + + configs.put(JobConfig.JOB_NAME(), "test-watermark-job"); + configs.put(JobConfig.PROCESSOR_ID(), "1"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "int"); + configs.put("systems.kafka.samza.msg.serde", "json"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.int.class", IntegerSerdeFactory.class.getName()); + configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName()); + configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName()); + + final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); + List<PageView> received = new ArrayList<>(); + final StreamApplication app = (streamGraph, cfg) -> { - streamGraph.getInputStream("PageView", (k, v) -> (PageView) v) - .partitionBy(PageView::getMemberId) ++ streamGraph.<KV<String, PageView>>getInputStream("PageView") ++ .map(EndOfStreamIntegrationTest.Values.create()) ++ .partitionBy(pv -> pv.getMemberId(), pv -> pv) + .sink((m, collector, coordinator) -> { - received.add(m); ++ received.add(m.getValue()); + }); + }; + runner.run(app); + Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner); + + runner.waitForFinish(); + + StreamOperatorTask task0 = tasks.get("Partition 0"); + OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0); + OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY); + assertEquals(TestOperatorImpl.getInputWatermark(pb), 4); + assertEquals(TestOperatorImpl.getOutputWatermark(pb), 4); + OperatorImpl sink = getOperator(graph, OperatorSpec.OpCode.SINK); + assertEquals(TestOperatorImpl.getInputWatermark(sink), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3); + + StreamOperatorTask task1 = tasks.get("Partition 1"); + graph = TestStreamOperatorTask.getOperatorImplGraph(task1); + pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY); + assertEquals(TestOperatorImpl.getInputWatermark(pb), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(pb), 3); + sink = getOperator(graph, OperatorSpec.OpCode.SINK); + assertEquals(TestOperatorImpl.getInputWatermark(sink), 3); + assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3); + } + + Map<String, StreamOperatorTask> getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception { + StreamProcessor processor = TestLocalApplicationRunner.getProcessors(runner).iterator().next(); + SamzaContainer container = TestStreamProcessorUtil.getContainer(processor); + Map<TaskName, TaskInstance> taskInstances = JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava(); + Map<String, StreamOperatorTask> tasks = new HashMap<>(); + for (Map.Entry<TaskName, TaskInstance> entry : taskInstances.entrySet()) { + AsyncStreamTaskAdapter adapter = (AsyncStreamTaskAdapter) entry.getValue().task(); + Field field = AsyncStreamTaskAdapter.class.getDeclaredField("wrappedTask"); + field.setAccessible(true); + StreamOperatorTask task = (StreamOperatorTask) field.get(adapter); + tasks.put(entry.getKey().getTaskName(), task); + } + return tasks; + } + + OperatorImpl getOperator(OperatorImplGraph graph, OperatorSpec.OpCode opCode) { + for (InputOperatorImpl input : graph.getAllInputOperators()) { + Set<OperatorImpl> nextOps = TestOperatorImpl.getNextOperators(input); + while (!nextOps.isEmpty()) { + OperatorImpl op = nextOps.iterator().next(); + if (TestOperatorImpl.getOpCode(op) == opCode) { + return op; + } else { + nextOps = TestOperatorImpl.getNextOperators(op); + } + } + } + return null; + } +}
