Repository: samza Updated Branches: refs/heads/master f7e173651 -> 92ae4c628
SAMZA-1219; Add metrics for operator message received and execution times Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish <[email protected]> Closes #142 from prateekm/operator-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/92ae4c62 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/92ae4c62 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/92ae4c62 Branch: refs/heads/master Commit: 92ae4c628abb3d113520ec47ca82f08c480123ad Parents: f7e1736 Author: Prateek Maheshwari <[email protected]> Authored: Thu Apr 27 15:55:11 2017 -0700 Committer: vjagadish1989 <[email protected]> Committed: Thu Apr 27 15:55:11 2017 -0700 ---------------------------------------------------------------------- .../samza/operators/impl/OperatorImpl.java | 136 +++++++++-- .../samza/operators/impl/OperatorImplGraph.java | 34 ++- .../operators/impl/PartialJoinOperatorImpl.java | 48 ++-- .../samza/operators/impl/RootOperatorImpl.java | 36 ++- .../impl/SessionWindowOperatorImpl.java | 52 ----- .../samza/operators/impl/SinkOperatorImpl.java | 25 +- .../operators/impl/StreamOperatorImpl.java | 27 ++- .../operators/impl/WindowOperatorImpl.java | 93 +++++--- .../samza/operators/spec/OperatorSpec.java | 14 +- .../operators/spec/PartialJoinOperatorSpec.java | 8 - .../samza/operators/spec/SinkOperatorSpec.java | 7 - .../operators/spec/StreamOperatorSpec.java | 7 - .../operators/spec/WindowOperatorSpec.java | 9 - .../apache/samza/task/StreamOperatorTask.java | 8 +- .../samza/operators/TestJoinOperator.java | 12 +- .../samza/operators/TestWindowOperator.java | 6 +- .../samza/operators/impl/TestOperatorImpl.java | 226 +++++++++++++++---- .../samza/operators/impl/TestOperatorImpls.java | 29 +-- .../operators/impl/TestSinkOperatorImpl.java | 7 +- .../operators/impl/TestStreamOperatorImpl.java | 22 +- 20 files changed, 547 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index b9a606b..d547869 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -18,9 +18,19 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.config.Config; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.HighResolutionClock; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -29,60 +39,140 @@ import java.util.Set; * Abstract base class for all stream operator implementations. */ public abstract class OperatorImpl<M, RM> { + private static final String METRICS_GROUP = OperatorImpl.class.getName(); - private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>(); + private boolean initialized; + private Set<OperatorImpl<RM, ?>> registeredOperators; + private HighResolutionClock highResClock; + private Counter numMessage; + private Timer handleMessageNs; + private Timer handleTimerNs; /** - * Register the next operator in the chain that this operator should propagate its output to. - * @param nextOperator the next operator in the chain. + * Initialize this {@link OperatorImpl} and its user-defined functions. + * + * @param config the {@link Config} for the task + * @param context the {@link TaskContext} for the task + */ + public final void init(Config config, TaskContext context) { + String opName = getOperatorSpec().getOpName(); + + if (initialized) { + throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName)); + } + + this.highResClock = createHighResClock(config); + registeredOperators = new HashSet<>(); + MetricsRegistry metricsRegistry = context.getMetricsRegistry(); + this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages"); + this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns"); + this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns"); + + handleInit(config, context); + + initialized = true; + } + + /** + * Initialize this {@link OperatorImpl} and its user-defined functions. + * + * @param config the {@link Config} for the task + * @param context the {@link TaskContext} for the task + */ + protected abstract void handleInit(Config config, TaskContext context); + + /** + * Register an operator that this operator should propagate its results to. + * + * @param nextOperator the next operator to propagate results to */ void registerNextOperator(OperatorImpl<RM, ?> nextOperator) { - nextOperators.add(nextOperator); + if (!initialized) { + throw new IllegalStateException( + String.format("Attempted to register next operator before initializing operator %s.", + getOperatorSpec().getOpName())); + } + this.registeredOperators.add(nextOperator); } /** - * Perform the transformation required for this operator and call the downstream operators. + * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators. + * <p> + * Delegates to {@link #handleMessage(Object, MessageCollector, TaskCoordinator)} for handling the message. * - * Must call {@link #propagateResult} to propagate the output to registered downstream operators correctly. + * @param message the input message + * @param collector the {@link MessageCollector} for this message + * @param coordinator the {@link TaskCoordinator} for this message + */ + public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.numMessage.inc(); + long startNs = this.highResClock.nanoTime(); + Collection<RM> results = handleMessage(message, collector, coordinator); + long endNs = this.highResClock.nanoTime(); + this.handleMessageNs.update(endNs - startNs); + + results.forEach(rm -> + this.registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + } + + /** + * Handle the incoming {@code message} and return the results to be propagated to registered operators. * * @param message the input message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context + * @return results of the transformation */ - public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + protected abstract Collection<RM> handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator); /** - * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)} + * Handle timer ticks for this {@link OperatorImpl} and propagate the results and timer tick to registered operators. + * <p> + * Delegates to {@link #handleTimer(MessageCollector, TaskCoordinator)} for handling the timer tick. * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - public final void onTick(MessageCollector collector, TaskCoordinator coordinator) { - onTimer(collector, coordinator); - nextOperators.forEach(sub -> sub.onTick(collector, coordinator)); + public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + long startNs = this.highResClock.nanoTime(); + Collection<RM> results = handleTimer(collector, coordinator); + long endNs = this.highResClock.nanoTime(); + this.handleTimerNs.update(endNs - startNs); + + results.forEach(rm -> + this.registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + this.registeredOperators.forEach(op -> + op.onTimer(collector, coordinator)); } /** - * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output - * to registered downstream operators. + * Handle the the timer tick for this operator and return the results to be propagated to registered operators. + * <p> + * Defaults to a no-op implementation. * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context + * @return results of the timed operation */ - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + protected Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + return Collections.emptyList(); } /** - * Helper method to propagate the output of this operator to all registered downstream operators. - * - * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer} - * to propagate the operator output correctly. + * Get the {@link OperatorSpec} for this {@link OperatorImpl}. * - * @param outputMessage output message - * @param collector the {@link MessageCollector} in the context - * @param coordinator the {@link TaskCoordinator} in the context + * @return the {@link OperatorSpec} for this {@link OperatorImpl} */ - void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); + protected abstract OperatorSpec<RM> getOperatorSpec(); + + private HighResolutionClock createHighResClock(Config config) { + if (new MetricsConfig(config).getMetricsTimerEnabled()) { + return System::nanoTime; + } else { + return () -> 0; + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 8e492dc..d8ea592 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -104,20 +104,22 @@ public class OperatorImplGraph { * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. * * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for - * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl} * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param <M> the type of messages in the {@code source} {@link MessageStreamImpl} * @return root node for the {@link OperatorImpl} DAG */ - private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) { + private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, + Config config, TaskContext context) { // since the source message stream might have multiple operator specs registered on it, // create a new root node as a single point of entry for the DAG. RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); + rootOperator.init(config, context); // create the pipeline/topology starting from the source source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { - // pass in the source and context s.t. stateful stream operators can initialize their stores + // pass in the context so that operator implementations can initialize their functions OperatorImpl<M, ?> operatorImpl = - createAndRegisterOperatorImpl(registeredOperator, source, config, context); + createAndRegisterOperatorImpl(registeredOperator, config, context); rootOperator.registerNextOperator(operatorImpl); }); return rootOperator; @@ -127,27 +129,26 @@ public class OperatorImplGraph { * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding * {@link OperatorImpl}s. * - * @param operatorSpec the operatorSpec registered with the {@code source} - * @param source the source {@link MessageStreamImpl} - * @param <M> type of input message + * @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param <M> type of input message * @return the operator implementation for the operatorSpec */ private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, - MessageStreamImpl<M> source, Config config, TaskContext context) { + Config config, TaskContext context) { if (!operatorImpls.containsKey(operatorSpec)) { - OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context); + OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context); if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) { // this is the first time we've added the operatorImpl corresponding to the operatorSpec, // so traverse and initialize and register the rest of the DAG. // initialize the corresponding operator function - operatorSpec.init(config, context); + operatorImpl.init(config, context); MessageStreamImpl nextStream = operatorSpec.getNextStream(); if (nextStream != null) { Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { - OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context); + OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context); operatorImpl.registerNextOperator(subImpl); }); } @@ -163,24 +164,21 @@ public class OperatorImplGraph { /** * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. * - * @param source the source {@link MessageStreamImpl} - * @param <M> type of input message * @param operatorSpec the immutable {@link OperatorSpec} definition. * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param <M> type of input message * @return the {@link OperatorImpl} implementation instance */ - private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, - OperatorSpec operatorSpec, Config config, TaskContext context) { + private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) { if (operatorSpec instanceof StreamOperatorSpec) { - StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec; - return new StreamOperatorImpl<>(streamOpSpec, source, config, context); + return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { - return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock); + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock); } throw new IllegalArgumentException( String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index e4cb9c2..c7bdc22 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -18,12 +18,11 @@ */ package org.apache.samza.operators.impl; -import java.util.ArrayList; -import java.util.List; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.metrics.Counter; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; @@ -35,6 +34,11 @@ import org.apache.samza.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + /** * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream * with buffered messages of type {@code JM} in the other stream. @@ -47,35 +51,45 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class); + private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec; private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn; private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn; private final long ttlMs; - private final int opId; private final Clock clock; - PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source, + private Counter keysRemoved; + + PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec, Config config, TaskContext context, Clock clock) { - this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn(); - this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn(); - this.ttlMs = partialJoinOperatorSpec.getTtlMs(); - this.opId = partialJoinOperatorSpec.getOpId(); + this.partialJoinOpSpec = partialJoinOpSpec; + this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn(); + this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn(); + this.ttlMs = partialJoinOpSpec.getTtlMs(); this.clock = clock; } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + protected void handleInit(Config config, TaskContext context) { + keysRemoved = context.getMetricsRegistry() + .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed"); + this.thisPartialJoinFn.init(config, context); + } + + @Override + public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { K key = thisPartialJoinFn.getKey(message); thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis())); PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key); long now = clock.currentTimeMillis(); if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) { RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage()); - this.propagateResult(joinResult, collector, coordinator); + return Collections.singletonList(joinResult); } + return Collections.emptyList(); } @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { long now = clock.currentTimeMillis(); KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState(); @@ -87,14 +101,18 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { if (entry.getValue().getReceivedTimeMs() < now - ttlMs) { keysToRemove.add(entry.getKey()); } else { - break; + break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order } } iterator.close(); thisState.deleteAll(keysToRemove); - - LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now); + keysRemoved.inc(keysToRemove.size()); + return Collections.emptyList(); } + @Override + protected OperatorSpec<RM> getOperatorSpec() { + return partialJoinOpSpec; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java index eb9b5e2..0f18e97 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -18,9 +18,16 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * A no-op operator implementation that forwards incoming messages to all of its subscribers. @@ -29,7 +36,32 @@ import org.apache.samza.task.TaskCoordinator; public final class RootOperatorImpl<M> extends OperatorImpl<M, M> { @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.propagateResult(message, collector, coordinator); + protected void handleInit(Config config, TaskContext context) { + } + + @Override + public Collection<M> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(message); + } + + // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message + @Override + protected OperatorSpec<M> getOperatorSpec() { + return new OperatorSpec<M>() { + @Override + public MessageStreamImpl<M> getNextStream() { + return null; + } + + @Override + public OpCode getOpCode() { + return OpCode.INPUT; + } + + @Override + public int getOpId() { + return -1; + } + }; } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java deleted file mode 100644 index 2bb362c..0000000 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Default implementation class of a {@link WindowOperatorSpec} for a session window. - * - * @param <M> the type of input message - * @param <RK> the type of window key - * @param <WV> the type of window state - */ -class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> { - - private final WindowOperatorSpec<M, RK, WV> windowSpec; - - SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { - this.windowSpec = windowSpec; - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - } - - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // This is to periodically check the timeout triggers to get the list of window states to be updated - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index f92fbfb..e82737f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -20,27 +20,44 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * Implementation for {@link SinkOperatorSpec} */ class SinkOperatorImpl<M> extends OperatorImpl<M, M> { + private final SinkOperatorSpec<M> sinkOpSpec; private final SinkFunction<M> sinkFn; - SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) { - this.sinkFn = sinkOp.getSinkFn(); + SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) { + this.sinkOpSpec = sinkOpSpec; + this.sinkFn = sinkOpSpec.getSinkFn(); + } + + @Override + protected void handleInit(Config config, TaskContext context) { + this.sinkFn.init(config, context); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + public Collection<M> handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator) { this.sinkFn.apply(message, collector, coordinator); // there should be no further chained operators since this is a terminal operator. - // hence we don't call #propogateResult() here. + return Collections.emptyList(); + } + + @Override + protected OperatorSpec<M> getOperatorSpec() { + return sinkOpSpec; } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index 644de20..bd4dce1 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -19,13 +19,15 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; + /** * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message. @@ -35,15 +37,28 @@ import org.apache.samza.task.TaskCoordinator; */ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { + private final StreamOperatorSpec<M, RM> streamOpSpec; private final FlatMapFunction<M, RM> transformFn; - StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { - this.transformFn = streamOperatorSpec.getTransformFn(); + StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec, + Config config, TaskContext context) { + this.streamOpSpec = streamOpSpec; + this.transformFn = streamOpSpec.getTransformFn(); + } + + @Override + protected void handleInit(Config config, TaskContext context) { + transformFn.init(config, context); + } + + @Override + public Collection<RM> handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator) { + return this.transformFn.apply(message); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // call the transform function and then for each output call propagateResult() - this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator)); + protected OperatorSpec<RM> getOperatorSpec() { + return streamOpSpec; } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index cd3b1bc..b99f719 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -20,14 +20,16 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.config.Config; import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.triggers.RepeatingTriggerImpl; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.TriggerImpl; import org.apache.samza.operators.triggers.TriggerImpls; -import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowKey; @@ -36,6 +38,7 @@ import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.Clock; import org.slf4j.Logger; @@ -46,6 +49,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; /** @@ -61,9 +65,8 @@ import java.util.function.Function; * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}. * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A - * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The - * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream - * operators. + * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. + * The {@link WindowOperatorImpl} checks if the trigger fired and returns the result of the firing. * * @param <M> the type of the incoming message * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream} @@ -74,56 +77,84 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class); + private final WindowOperatorSpec<M, WK, WV> windowOpSpec; + private final Clock clock; private final WindowInternal<M, WK, WV> window; private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>(); - TriggerScheduler<WK> triggerScheduler ; + private TriggerScheduler<WK> triggerScheduler; // The trigger state corresponding to each {@link TriggerKey}. private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>(); - private final Clock clock; - public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) { + public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOpSpec, Clock clock) { + this.windowOpSpec = windowOpSpec; this.clock = clock; - this.window = spec.getWindow(); + this.window = windowOpSpec.getWindow(); this.triggerScheduler= new TriggerScheduler(clock); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + protected void handleInit(Config config, TaskContext context) { + WindowInternal<M, WK, WV> window = windowOpSpec.getWindow(); + if (window.getFoldLeftFunction() != null) { + window.getFoldLeftFunction().init(config, context); + } + } + + @Override + public Collection<WindowPane<WK, WV>> handleMessage( + M message, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Processing message envelope: {}", message); + List<WindowPane<WK, WV>> results = new ArrayList<>(); WindowKey<WK> storeKey = getStoreKey(message); WindowState<WV> existingState = store.get(storeKey); WindowState<WV> newState = applyFoldFunction(existingState, message); - LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp()); + LOG.trace("New window value: {}, earliest timestamp: {}", + newState.getWindowValue(), newState.getEarliestTimestamp()); store.put(storeKey, newState); if (window.getEarlyTrigger() != null) { TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); - getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger()) - .onMessage(triggerKey, message, collector, coordinator); + TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger()); + Optional<WindowPane<WK, WV>> maybeTriggeredPane = + triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } if (window.getDefaultTrigger() != null) { TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); - getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger()) - .onMessage(triggerKey, message, collector, coordinator); + TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger()); + Optional<WindowPane<WK, WV>> maybeTriggeredPane = + triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } + + return results; } @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + List<WindowPane<WK, WV>> results = new ArrayList<>(); + List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks(); for (TriggerKey<WK> key : keys) { TriggerImplHandler triggerImplHandler = triggers.get(key); if (triggerImplHandler != null) { - triggerImplHandler.onTimer(key, collector, coordinator); + Optional<WindowPane<WK, WV>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } } + return results; + } + + @Override + protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() { + return windowOpSpec; } /** @@ -168,7 +199,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK return newState; } - private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) { + private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> triggerKey, Trigger<M> trigger) { TriggerImplHandler wrapper = triggers.get(triggerKey); if (wrapper != null) { LOG.trace("Returning existing trigger wrapper for {}", triggerKey); @@ -185,9 +216,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK } /** - * Handles trigger firings, and propagates results to downstream operators. + * Handles trigger firings and returns the optional result. */ - private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) { + private Optional<WindowPane<WK, WV>> onTriggerFired( + TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Trigger key {} fired." , triggerKey); TriggerImplHandler wrapper = triggers.get(triggerKey); @@ -196,11 +228,10 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK if (state == null) { LOG.trace("No state found for triggerKey: {}", triggerKey); - return; + return Optional.empty(); } WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state); - super.propagateResult(paneOutput, collector, coordinator); // Handle accumulation modes. if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { @@ -228,6 +259,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) { cancelTrigger(triggerKey, false); } + + return Optional.of(paneOutput); } /** @@ -248,7 +281,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal); } - WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); + WindowPane<WK, WV> paneOutput = + new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); LOG.trace("Emitting pane output for trigger key {}", triggerKey); return paneOutput; } @@ -279,7 +313,8 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK this.impl = impl; } - public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { + public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M message, + MessageCollector collector, TaskCoordinator coordinator) { if (!isCancelled) { LOG.trace("Forwarding callbacks for {}", message); impl.onMessage(message, triggerScheduler); @@ -289,12 +324,14 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK if (impl instanceof RepeatingTriggerImpl) { ((RepeatingTriggerImpl<M, WK>) impl).clear(); } - onTriggerFired(triggerKey, collector, coordinator); + return onTriggerFired(triggerKey, collector, coordinator); } } + return Optional.empty(); } - public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) { + public Optional<WindowPane<WK, WV>> onTimer( + TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) { if (impl.shouldFire() && !isCancelled) { LOG.trace("Triggering timer triggers"); @@ -302,8 +339,9 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK if (impl instanceof RepeatingTriggerImpl) { ((RepeatingTriggerImpl<M, WK>) impl).clear(); } - onTriggerFired(key, collector, coordinator); + return onTriggerFired(key, collector, coordinator); } + return Optional.empty(); } public void cancel() { @@ -315,5 +353,4 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK return this.impl instanceof RepeatingTriggerImpl; } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 18090e2..cc3c4ab 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -19,9 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.task.TaskContext; /** @@ -34,6 +32,7 @@ import org.apache.samza.task.TaskContext; public interface OperatorSpec<OM> { enum OpCode { + INPUT, MAP, FLAT_MAP, FILTER, @@ -64,10 +63,11 @@ public interface OperatorSpec<OM> { int getOpId(); /** - * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task + * Get the name for this operator based on its opCode and opId. + * @return the name for this operator */ - default void init(Config config, TaskContext context) { } + default String getOpName() { + return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId()); + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java index b1dc529..e85626f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -18,10 +18,8 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.task.TaskContext; /** @@ -88,10 +86,4 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> { public int getOpId() { return this.opId; } - - @Override - public void init(Config config, TaskContext context) { - this.thisPartialJoinFn.init(config, context); - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 7de85f3..0d135d3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -18,14 +18,12 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.stream.OutputStreamInternal; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -101,11 +99,6 @@ public class SinkOperatorSpec<M> implements OperatorSpec { return this.opId; } - @Override - public void init(Config config, TaskContext context) { - this.sinkFn.init(config, context); - } - /** * Creates a {@link SinkFunction} to send messages to the provided {@code output}. * @param outputStream the {@link OutputStreamInternal} to send messages to http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index f9bbe2d..204e566 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -18,10 +18,8 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.task.TaskContext; /** @@ -71,9 +69,4 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { public int getOpId() { return this.opId; } - - @Override - public void init(Config config, TaskContext context) { - this.transformFn.init(config, context); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 9515e38..73b17b5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,11 +19,9 @@ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.task.TaskContext; /** @@ -53,13 +51,6 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK } @Override - public void init(Config config, TaskContext context) { - if (window.getFoldLeftFunction() != null) { - window.getFoldLeftFunction().init(config, context); - } - } - - @Override public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() { return this.nextStream; } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index be52565..4720298 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -118,17 +118,19 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream(); InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream); - // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder. RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream); if (rootOperatorImpl != null) { - rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator); + // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde + // before applying the msgBuilder. + Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()); + rootOperatorImpl.onMessage(message, collector, coordinator); } } @Override public final void window(MessageCollector collector, TaskCoordinator coordinator) { operatorImplGraph.getAllRootOperators() - .forEach(rootOperator -> rootOperator.onTick(collector, coordinator)); + .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 7a6f959..23b67aa 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -19,13 +19,10 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; @@ -42,6 +39,11 @@ import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; import org.junit.Test; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -237,6 +239,8 @@ public class TestJoinOperator { when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); + when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + Config config = mock(Config.class); StreamApplication sgb = new TestStreamApplication(); http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index 6603137..597244e 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -26,9 +26,8 @@ import junit.framework.Assert; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.triggers.FiringType; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.testUtils.TestClock; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; @@ -36,11 +35,13 @@ import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.TestClock; import org.junit.Before; import org.junit.Test; @@ -71,6 +72,7 @@ public class TestWindowOperator { runner = mock(ApplicationRunner.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka")); } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index f978c3c..bd18f0b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,61 +18,205 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.TestMessageEnvelope; -import org.apache.samza.operators.data.TestOutputMessageEnvelope; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -import org.hamcrest.core.IsEqual; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.argThat; +import java.util.Collection; +import java.util.Collections; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestOperatorImpl { - TestMessageEnvelope curInputMsg; - MessageCollector curCollector; - TaskCoordinator curCoordinator; + @Test(expected = IllegalStateException.class) + public void testMultipleInitShouldThrow() { + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class)); + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(mock(Config.class), mockTaskContext); + } + + @Test(expected = IllegalStateException.class) + public void testRegisterNextOperatorBeforeInitShouldThrow() { + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class)); + opImpl.registerNextOperator(mock(OperatorImpl.class)); + } + + @Test + public void testOnMessagePropagatesResults() { + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // register a couple of operators + OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); + when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl1); + + OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); + when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl2); + + // send a message to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator); + + // verify that it propagates its handleMessage results to next operators + verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + } + + @Test + public void testOnMessageUpdatesMetrics() { + TaskContext mockTaskContext = mock(TaskContext.class); + MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + Counter mockCounter = mock(Counter.class); + Timer mockTimer = mock(Timer.class); + when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter); + when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // send a message to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator); + + // verify that it updates message count and timer metrics + verify(mockCounter, times(1)).inc(); + verify(mockTimer, times(1)).update(anyLong()); + } + + @Test + public void testOnTimerPropagatesResultsAndTimer() { + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // register a couple of operators + OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); + when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl1); + + OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); + when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl2); + + // send a timer tick to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onTimer(mockCollector, mockCoordinator); + + // verify that it propagates its handleTimer results to next operators + verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + + // verify that it propagates the timer tick to next operators + verify(mockNextOpImpl1, times(1)).handleTimer(mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleTimer(mockCollector, mockCoordinator); + } @Test - public void testSubscribers() { - this.curInputMsg = null; - this.curCollector = null; - this.curCoordinator = null; - OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() { - @Override - public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { - TestOperatorImpl.this.curInputMsg = message; - TestOperatorImpl.this.curCollector = collector; - TestOperatorImpl.this.curCoordinator = coordinator; - } - @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - - } - - }; - // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() - OperatorImpl mockSub = mock(OperatorImpl.class); - opImpl.registerNextOperator(mockSub); - TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class); + public void testOnTimerUpdatesMetrics() { + TaskContext mockTaskContext = mock(TaskContext.class); + MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + Counter mockMessageCounter = mock(Counter.class); + Timer mockTimer = mock(Timer.class); + when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter); + when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // send a message to this operator MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.propagateResult(xOutput, mockCollector, mockCoordinator); - verify(mockSub, times(1)).onNext( - argThat(new IsEqual<>(xOutput)), - argThat(new IsEqual<>(mockCollector)), - argThat(new IsEqual<>(mockCoordinator)) - ); - // verify onNext() is invoked correctly - TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); - opImpl.onNext(mockInput, mockCollector, mockCoordinator); - assertEquals(mockInput, this.curInputMsg); - assertEquals(mockCollector, this.curCollector); - assertEquals(mockCoordinator, this.curCoordinator); + opImpl.onTimer(mockCollector, mockCoordinator); + + // verify that it updates metrics + verify(mockMessageCounter, times(0)).inc(); + verify(mockTimer, times(1)).update(anyLong()); + } + + private static class TestOpImpl extends OperatorImpl<Object, Object> { + private final Object mockOutput; + + TestOpImpl(Object mockOutput) { + this.mockOutput = mockOutput; + } + + @Override + protected void handleInit(Config config, TaskContext context) {} + + @Override + public Collection<Object> handleMessage(Object message, + MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(mockOutput); + } + + @Override + public Collection<Object> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(mockOutput); + } + + @Override + protected OperatorSpec<Object> getOperatorSpec() { + return new TestOpSpec(); + } + } + + private static class TestOpSpec implements OperatorSpec<Object> { + @Override + public MessageStreamImpl<Object> getNextStream() { + return null; + } + + @Override + public OpCode getOpCode() { + return OpCode.INPUT; + } + + @Override + public int getOpId() { + return -1; + } } } + http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index 267cdfc..a75fadb 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TestMessageStreamImplUtil; @@ -26,11 +27,10 @@ import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.windows.Windows; @@ -62,14 +62,15 @@ public class TestOperatorImpls { @Before public void prep() throws NoSuchFieldException, NoSuchMethodException { - nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); + nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators"); nextOperatorsField.setAccessible(true); - createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, + createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", OperatorSpec.class, Config.class, TaskContext.class); createOpMethod.setAccessible(true); - createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); + createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, + Config.class, TaskContext.class); createOpsMethod.setAccessible(true); } @@ -79,13 +80,12 @@ public class TestOperatorImpls { WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING); when(mockWnd.getWindow()).thenReturn(windowInternal); - MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); TaskContext mockContext = mock(TaskContext.class); OperatorImplGraph opGraph = new OperatorImplGraph(); OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>) - createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); + createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext); assertTrue(opImpl instanceof WindowOperatorImpl); Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); wndInternalField.setAccessible(true); @@ -96,7 +96,7 @@ public class TestOperatorImpls { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class); when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext); assertTrue(opImpl instanceof StreamOperatorImpl); Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); txfmFnField.setAccessible(true); @@ -106,7 +106,7 @@ public class TestOperatorImpls { SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { }; SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext); assertTrue(opImpl instanceof SinkOperatorImpl); Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); sinkFnField.setAccessible(true); @@ -114,8 +114,7 @@ public class TestOperatorImpls { // get join operator PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class); - PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class); - opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); + opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext); assertTrue(opImpl instanceof PartialJoinOperatorImpl); } @@ -124,6 +123,7 @@ public class TestOperatorImpls { // test creation of empty chain MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); OperatorImplGraph opGraph = new OperatorImplGraph(); RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); @@ -134,8 +134,9 @@ public class TestOperatorImpls { public void testLinearChain() throws IllegalAccessException, InvocationTargetException { // test creation of linear chain StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); OperatorImplGraph opGraph = new OperatorImplGraph(); @@ -154,8 +155,9 @@ public class TestOperatorImpls { public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException { // test creation of broadcast chain StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); + MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); @@ -187,6 +189,7 @@ public class TestOperatorImpls { MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); input1 .join(input2, http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index abd7740..1c01e57 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -27,7 +27,10 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestSinkOperatorImpl { @@ -44,7 +47,7 @@ public class TestSinkOperatorImpl { MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); + sinkImpl.handleMessage(mockMsg, mockCollector, mockCoordinator); verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); } } http://git-wip-us.apache.org/repos/asf/samza/blob/92ae4c62/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 9dd161a..36d7b92 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -18,10 +18,7 @@ */ package org.apache.samza.operators.impl; -import java.util.ArrayList; -import java.util.Collection; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; @@ -31,7 +28,15 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import static org.mockito.Mockito.*; +import java.util.ArrayList; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestStreamOperatorImpl { @@ -42,10 +47,10 @@ public class TestStreamOperatorImpl { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); - MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); TaskContext mockContext = mock(TaskContext.class); - StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext)); + StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = + spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext)); TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class); Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { { @@ -54,8 +59,9 @@ public class TestStreamOperatorImpl { when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.onNext(inMsg, mockCollector, mockCoordinator); + Collection<TestOutputMessageEnvelope> results = opImpl + .handleMessage(inMsg, mockCollector, mockCoordinator); verify(txfmFn, times(1)).apply(inMsg); - verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator); + assertEquals(results, mockOutputs); } }
