SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs.
Provided join operator access to durable state stores. Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #309 from prateekm/operator-store-wiring Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a671288e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a671288e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a671288e Branch: refs/heads/master Commit: a671288e18dea3336fe1c625233bb7427ed8b5b2 Parents: ad80cf9 Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 4 15:37:50 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 4 15:37:50 2017 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../apache/samza/operators/MessageStream.java | 13 ++- .../operators/functions/WatermarkFunction.java | 31 +++--- .../apache/samza/container/TaskContextImpl.java | 6 +- .../coordinator/DistributedLockWithState.java | 4 +- .../org/apache/samza/execution/JobNode.java | 63 ++++++++--- .../samza/operators/MessageStreamImpl.java | 16 +-- .../functions/PartialJoinFunction.java | 26 +---- .../samza/operators/impl/OperatorImplGraph.java | 38 +++++-- .../operators/impl/PartialJoinOperatorImpl.java | 72 ++++-------- .../operators/impl/store/TimeSeriesStore.java | 1 + .../operators/impl/store/TimestampedValue.java | 16 +-- .../impl/store/TimestampedValueSerde.java | 55 +++++++++ .../samza/operators/spec/JoinOperatorSpec.java | 70 +++++++++--- .../samza/operators/spec/OperatorSpecs.java | 17 ++- .../operators/spec/StatefulOperatorSpec.java | 37 +++++++ .../samza/operators/spec/StoreDescriptor.java | 81 ++++++++++++++ .../samza/runtime/LocalApplicationRunner.java | 2 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 5 +- .../apache/samza/container/SamzaContainer.scala | 8 +- .../samza/example/OrderShipmentJoinExample.java | 4 +- .../samza/execution/TestExecutionPlanner.java | 42 ++++--- .../execution/TestJobGraphJsonGenerator.java | 19 +++- .../org/apache/samza/execution/TestJobNode.java | 111 ++++++++++++++----- .../samza/operators/TestJoinOperator.java | 12 +- .../samza/operators/TestMessageStreamImpl.java | 3 +- .../operators/impl/TestOperatorImplGraph.java | 31 ++++-- .../impl/store/TestTimeSeriesStoreImpl.java | 54 ++++----- .../impl/store/TestTimestampedValueSerde.java | 52 +++++++++ .../apache/samza/test/operator/PageView.java | 50 --------- .../test/operator/RepartitionJoinWindowApp.java | 92 +++++++++++++++ .../test/operator/RepartitionWindowApp.java | 55 --------- .../samza/test/operator/SessionWindowApp.java | 1 + ...StreamApplicationIntegrationTestHarness.java | 12 +- .../operator/TestRepartitionJoinWindowApp.java | 81 ++++++++++++++ .../test/operator/TestRepartitionWindowApp.java | 70 ------------ .../samza/test/operator/TumblingWindowApp.java | 1 + .../samza/test/operator/data/AdClick.java | 41 +++++++ .../samza/test/operator/data/PageView.java | 50 +++++++++ .../test/operator/data/UserPageAdClick.java | 62 +++++++++++ 40 files changed, 974 insertions(+), 431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 7cbffe7..c2e9180 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ docs/_site build **/bin samza-test/state +state/ docs/learn/documentation/*/api/javadocs docs/learn/documentation/*/rest/javadocs .DS_Store http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 2a1045d..c36fe1f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; import java.time.Duration; import java.util.ArrayList; @@ -128,13 +129,17 @@ public interface MessageStream<M> { * @param otherStream the other {@link MessageStream} to be joined with * @param joinFn the function to join messages from this and the other {@link MessageStream} * @param ttl the ttl for messages in each stream + * @param keySerde the serde for the join key + * @param messageSerde the serde for messages in this stream + * @param otherMessageSerde the serde for messages in the other stream * @param <K> the type of join key - * @param <JM> the type of messages in the other stream - * @param <OM> the type of messages resulting from the {@code joinFn} + * @param <OM> the type of messages in the other stream + * @param <JM> the type of messages resulting from the {@code joinFn} * @return the joined {@link MessageStream} */ - <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream, - JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl); + <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream, + JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn, + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl); /** * Merges all {@code otherStreams} with this {@link MessageStream}. http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java index 9c4b9bf..3be293e 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java @@ -20,14 +20,14 @@ package org.apache.samza.operators.functions; /** - * Allows user-specific handling of Watermark + * Allows handling of watermarks. */ public interface WatermarkFunction { /** * Processes the input watermark coming from upstream operators. - * This allows user-defined watermark handling, such as trigger events - * or propagate it to downstream. + * This allows custom watermark handling, such as triggering events or propagating it downstream. + * * @param watermark input watermark */ void processWatermark(long watermark); @@ -35,24 +35,19 @@ public interface WatermarkFunction { /** * Returns the output watermark. This function will be invoked immediately after either * of the following events: - * * <ol> - * - * <li> Return of the transform function, e.g. {@link FlatMapFunction}. - * - * <li> Return of the processWatermark function. - * + * <li> Return from the transform function, e.g. {@link FlatMapFunction}. + * <li> Return from the {@link #processWatermark} function. * </ol> + + * Note: If the transform function returns a collection of messages, the output watermark + * will be emitted after the output collection has been propagated to downstream operators. + * This might delay the watermark propagation, which will cause more buffering and might + * have a performance impact. * - * - * - * Note: If the transform function returns a collection of output, the output watermark - * will be emitted after the output collection is propagated to downstream operators. So - * it might delay the watermark propagation. The delay will cause more buffering and might - * have performance impact. - * - * @return output watermark, or null if the output watermark should not be updated. Samza - * guarantees that the same watermark value will be only emitted once. + * @return output watermark, or null if the output watermark should not be updated. + * Samza guarantees that the same watermark value will only be emitted once. */ Long getOutputWatermark(); + } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index 7990d2b..aa622a3 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.ReadableMetricsRegistry; -import org.apache.samza.storage.StorageEngine; import org.apache.samza.storage.TaskStorageManager; +import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.TaskContext; @@ -79,9 +79,9 @@ public class TaskContextImpl implements TaskContext { } @Override - public StorageEngine getStore(String storeName) { + public KeyValueStore getStore(String storeName) { if (storageManager != null) { - return storageManager.apply(storeName); + return (KeyValueStore) storageManager.apply(storeName); } else { LOG.warn("No store found for name: {}", storeName); return null; http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java index 0de7813..c8e9033 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java @@ -26,12 +26,12 @@ import java.util.concurrent.TimeoutException; public interface DistributedLockWithState { /** - * Trie to acquire the lock, but first check if the state flag is set. If it is set, return false. + * Try to acquire the lock, but first check if the state flag is set. If it is set, return false. * If the flag is not set, and lock is acquired - return true. - * Throw TimeOutException if could not acquire the lock. * @param timeout Duration of lock acquiring timeout. * @param unit Time Unit of the timeout defined above. * @return true if lock is acquired successfully, false if state is already set. + * @throws TimeoutException if could not acquire the lock. */ boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException; http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 0368829..2e89292 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -33,6 +33,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.StreamGraphImpl; @@ -40,6 +41,7 @@ import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.StatefulOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.util.MathUtils; import org.apache.samza.serializers.Serde; @@ -129,6 +131,14 @@ public class JobNode { } } + streamGraph.getAllOperatorSpecs().forEach(opSpec -> { + if (opSpec instanceof StatefulOperatorSpec) { + ((StatefulOperatorSpec) opSpec).getStoreDescriptors() + .forEach(sd -> configs.putAll(sd.getStorageConfigs())); + // store key and message serdes are configured separately in #addSerdeConfigs + } + }); + configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); // write input/output streams to configs @@ -168,32 +178,42 @@ public class JobNode { * * @param configs the configs to add serialized serde instances and stream serde configs to */ - protected void addSerdeConfigs(Map<String, String> configs) { + void addSerdeConfigs(Map<String, String> configs) { // collect all key and msg serde instances for streams - Map<String, Serde> keySerdes = new HashMap<>(); - Map<String, Serde> msgSerdes = new HashMap<>(); + Map<String, Serde> streamKeySerdes = new HashMap<>(); + Map<String, Serde> streamMsgSerdes = new HashMap<>(); Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators(); inEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec()); - Serde keySerde = inputOperatorSpec.getKeySerde(); - Serde valueSerde = inputOperatorSpec.getValueSerde(); - keySerdes.put(streamId, keySerde); - msgSerdes.put(streamId, valueSerde); + streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde()); + streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde()); }); Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams(); outEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec()); - Serde keySerde = outputStream.getKeySerde(); - Serde valueSerde = outputStream.getValueSerde(); - keySerdes.put(streamId, keySerde); - msgSerdes.put(streamId, valueSerde); + streamKeySerdes.put(streamId, outputStream.getKeySerde()); + streamMsgSerdes.put(streamId, outputStream.getValueSerde()); + }); + + // collect all key and msg serde instances for stores + Map<String, Serde> storeKeySerdes = new HashMap<>(); + Map<String, Serde> storeMsgSerdes = new HashMap<>(); + streamGraph.getAllOperatorSpecs().forEach(opSpec -> { + if (opSpec instanceof StatefulOperatorSpec) { + ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> { + storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde()); + storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde()); + }); + } }); - // for each unique serde instance, generate a unique name and serialize to config - HashSet<Serde> serdes = new HashSet<>(keySerdes.values()); - serdes.addAll(msgSerdes.values()); + // for each unique stream or store serde instance, generate a unique name and serialize to config + HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values()); + serdes.addAll(streamMsgSerdes.values()); + serdes.addAll(storeKeySerdes.values()); + serdes.addAll(storeMsgSerdes.values()); SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); Base64.Encoder base64Encoder = Base64.getEncoder(); Map<Serde, String> serdeUUIDs = new HashMap<>(); @@ -205,17 +225,28 @@ public class JobNode { }); // set key and msg serdes for streams to the serde names generated above - keySerdes.forEach((streamId, serde) -> { + streamKeySerdes.forEach((streamId, serde) -> { String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE(); configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); }); - msgSerdes.forEach((streamId, serde) -> { + streamMsgSerdes.forEach((streamId, serde) -> { String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId); String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE(); configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde)); }); + + // set key and msg serdes for stores to the serde names generated above + storeKeySerdes.forEach((storeName, serde) -> { + String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName); + configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); + }); + + storeMsgSerdes.forEach((storeName, serde) -> { + String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); + configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); + }); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 7b93a9e..8460ada 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -37,6 +37,7 @@ import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; import java.time.Duration; import java.util.Collection; @@ -112,15 +113,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> { } @Override - public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream, - JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) { - OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec(); - JoinOperatorSpec<K, M, JM, TM> joinOpSpec = - OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, - (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId()); + public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream, + JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn, + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) { + OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec(); + JoinOperatorSpec<K, M, OM, JM> joinOpSpec = + OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, + keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId()); this.operatorSpec.registerNextOperatorSpec(joinOpSpec); - otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec); + otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec); return new MessageStreamImpl<>(this.graph, joinOpSpec); } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java index 9b7956a..5ede5e8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java +++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -18,21 +18,22 @@ */ package org.apache.samza.operators.functions; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.storage.kv.KeyValueStore; /** * An internal function that maintains state and join logic for one side of a two-way join. */ -public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction { +public interface PartialJoinFunction<K, M, OM, JM> extends InitableFunction, ClosableFunction { /** * Joins a message in this stream with a message from another stream. * * @param m message from this input stream - * @param jm message from the other input stream + * @param om message from the other input stream * @return the joined message in the output stream */ - RM apply(M m, JM jm); + JM apply(M m, OM om); /** * Gets the key for the input message. @@ -47,23 +48,6 @@ public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, Clo * * @return the key value store containing the state for this stream */ - KeyValueStore<K, PartialJoinMessage<M>> getState(); + KeyValueStore<K, TimestampedValue<M>> getState(); - class PartialJoinMessage<M> { - private final M message; - private final long receivedTimeMs; - - public PartialJoinMessage(M message, long receivedTimeMs) { - this.message = message; - this.receivedTimeMs = receivedTimeMs; - } - - public M getMessage() { - return message; - } - - public long getReceivedTimeMs() { - return receivedTimeMs; - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 1f86975..808ddbf 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -37,7 +37,7 @@ import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.util.InternalInMemoryStore; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; @@ -219,16 +219,17 @@ public class OperatorImplGraph { private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) { return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(), - joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn()))); + joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), createRightJoinFn(joinOpSpec))); } - private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) { + private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinOperatorSpec joinOpSpec) { return new PartialJoinFunction<Object, Object, Object, Object>() { - private KeyValueStore<Object, PartialJoinMessage<Object>> leftStreamState = new InternalInMemoryStore<>(); + private final JoinFunction joinFn = joinOpSpec.getJoinFn(); + private KeyValueStore<Object, TimestampedValue<Object>> leftStreamState; @Override - public Object apply(Object m, Object jm) { - return joinFn.apply(m, jm); + public Object apply(Object m, Object om) { + return joinFn.apply(m, om); } @Override @@ -237,12 +238,15 @@ public class OperatorImplGraph { } @Override - public KeyValueStore<Object, PartialJoinMessage<Object>> getState() { + public KeyValueStore<Object, TimestampedValue<Object>> getState() { return leftStreamState; } @Override public void init(Config config, TaskContext context) { + String leftStoreName = joinOpSpec.getLeftOpName(); + leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName); + // user-defined joinFn should only be initialized once, so we do it only in left partial join function. joinFn.init(config, context); } @@ -255,13 +259,14 @@ public class OperatorImplGraph { }; } - private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinFunction joinFn) { + private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinOperatorSpec joinOpSpec) { return new PartialJoinFunction<Object, Object, Object, Object>() { - private KeyValueStore<Object, PartialJoinMessage<Object>> rightStreamState = new InternalInMemoryStore<>(); + private final JoinFunction joinFn = joinOpSpec.getJoinFn(); + private KeyValueStore<Object, TimestampedValue<Object>> rightStreamState; @Override - public Object apply(Object m, Object jm) { - return joinFn.apply(jm, m); + public Object apply(Object m, Object om) { + return joinFn.apply(om, m); } @Override @@ -270,7 +275,16 @@ public class OperatorImplGraph { } @Override - public KeyValueStore<Object, PartialJoinMessage<Object>> getState() { + public void init(Config config, TaskContext context) { + String rightStoreName = joinOpSpec.getRightOpName(); + rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName); + + // user-defined joinFn should only be initialized once, + // so we do it only in left partial join function and not here again. + } + + @Override + public KeyValueStore<Object, TimestampedValue<Object>> getState() { return rightStreamState; } }; http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index ad66962..e976a43 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -19,47 +19,39 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; -import org.apache.samza.metrics.Counter; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage; +import org.apache.samza.operators.impl.store.TimestampedValue; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueIterator; -import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.Clock; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; /** * Implementation of one side of a {@link JoinOperatorSpec} that buffers and joins its input messages of - * type {@code M} with buffered input messages of type {@code JM} in the paired {@link PartialJoinOperatorImpl}. + * type {@code M} with buffered input messages of type {@code OM} in the paired {@link PartialJoinOperatorImpl}. * * @param <K> the type of join key * @param <M> the type of input messages on this side of the join - * @param <JM> the type of input message on the other side of the join - * @param <RM> the type of join result + * @param <OM> the type of input message on the other side of the join + * @param <JM> the type of join result */ -class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { +class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> { - private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec; + private final JoinOperatorSpec<K, M, OM, JM> joinOpSpec; private final boolean isLeftSide; // whether this operator impl is for the left side of the join - private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn; - private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn; + private final PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn; + private final PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn; private final long ttlMs; private final Clock clock; - private Counter keysRemoved; - - PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean isLeftSide, - PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, - PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, + PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean isLeftSide, + PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn, + PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn, Config config, TaskContext context, Clock clock) { this.joinOpSpec = joinOpSpec; this.isLeftSide = isLeftSide; @@ -71,54 +63,29 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { @Override protected void handleInit(Config config, TaskContext context) { - keysRemoved = context.getMetricsRegistry() - .newCounter(OperatorImpl.class.getName(), getOperatorName() + "-keys-removed"); this.thisPartialJoinFn.init(config, context); } @Override - public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { K key = thisPartialJoinFn.getKey(message); - thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis())); - PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key); + thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis())); + TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key); long now = clock.currentTimeMillis(); - if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) { - RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage()); + if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) { + JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue()); return Collections.singletonList(joinResult); } return Collections.emptyList(); } @Override - public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { - long now = clock.currentTimeMillis(); - - KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState(); - KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all(); - List<K> keysToRemove = new ArrayList<>(); - - while (iterator.hasNext()) { - Entry<K, PartialJoinMessage<M>> entry = iterator.next(); - if (entry.getValue().getReceivedTimeMs() < now - ttlMs) { - keysToRemove.add(entry.getKey()); - } else { - break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order - } - } - - iterator.close(); - thisState.deleteAll(keysToRemove); - keysRemoved.inc(keysToRemove.size()); - return Collections.emptyList(); - } - - @Override protected void handleClose() { this.thisPartialJoinFn.close(); } - protected OperatorSpec<M, RM> getOperatorSpec() { - return (OperatorSpec<M, RM>) joinOpSpec; + protected OperatorSpec<M, JM> getOperatorSpec() { + return (OperatorSpec<M, JM>) joinOpSpec; } /** @@ -129,7 +96,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> { */ @Override protected String getOperatorName() { - String side = isLeftSide ? "L" : "R"; - return this.joinOpSpec.getOpName() + "-" + side; + return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java index e544e2e..56d839e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java @@ -52,6 +52,7 @@ public interface TimeSeriesStore<K, V> { * @param key the key to look up in the store * @param startTimestamp the start timestamp of the range, inclusive * @param endTimestamp the end timestamp of the range, exclusive + * @return an iterator over the values for the given key in the provided time-range that must be closed after use * @throws IllegalArgumentException when startTimeStamp > endTimestamp, or when either of them is negative */ ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp); http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java index ad5e844..5e45148 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java @@ -26,18 +26,18 @@ package org.apache.samza.operators.impl.store; */ public class TimestampedValue<V> { private final V value; - private final Long timestamp; + private final long timestamp; - public TimestampedValue(V v, Long time) { - value = v; - timestamp = time; + public TimestampedValue(V v, long timestamp) { + this.value = v; + this.timestamp = timestamp; } public V getValue() { return value; } - public Long getTimestamp() { + public long getTimestamp() { return timestamp; } @@ -48,14 +48,14 @@ public class TimestampedValue<V> { TimestampedValue<?> that = (TimestampedValue<?>) o; - if (value != null ? !value.equals(that.value) : that.value != null) return false; - return timestamp.equals(that.timestamp); + if (timestamp != that.timestamp) return false; + return value != null ? value.equals(that.value) : (that.value == null); } @Override public int hashCode() { int result = value != null ? value.hashCode() : 0; - result = 31 * result + timestamp.hashCode(); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); return result; } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java new file mode 100644 index 0000000..b14f8a4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.store; + +import org.apache.samza.serializers.Serde; + +import java.nio.ByteBuffer; + + +public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> { + private static final int TIMESTAMP_BYTES = 8; + private final Serde<V> vSerde; + + public TimestampedValueSerde(Serde<V> vSerde) { + this.vSerde = vSerde; + } + + @Override + public TimestampedValue<V> fromBytes(byte[] bytes) { + ByteBuffer bb = ByteBuffer.wrap(bytes); + byte[] vBytes = new byte[bytes.length - TIMESTAMP_BYTES]; + bb.get(vBytes, 0, vBytes.length); + V v = vSerde.fromBytes(vBytes); + long ts = bb.getLong(); + return new TimestampedValue<>(v, ts); + } + + @Override + public byte[] toBytes(TimestampedValue<V> tv) { + byte[] vBytes = vSerde.toBytes(tv.getValue()); + int vBytesLength = vBytes != null ? vBytes.length : 0; + ByteBuffer bb = ByteBuffer.allocate(vBytesLength + TIMESTAMP_BYTES); + if (vBytes != null) { + bb.put(vBytes); + } + bb.putLong(tv.getTimestamp()); + return bb.array(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index f4fe0fd..3f99280 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -18,8 +18,16 @@ */ package org.apache.samza.operators.spec; +import com.google.common.collect.ImmutableMap; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.operators.impl.store.TimestampedValueSerde; +import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.serializers.Serde; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; /** @@ -28,14 +36,17 @@ import org.apache.samza.operators.functions.WatermarkFunction; * * @param <K> the type of join key * @param <M> the type of message in this stream - * @param <JM> the type of message in the other stream - * @param <RM> the type of join result + * @param <OM> the type of message in the other stream + * @param <JM> the type of join result */ -public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { // Object == M | JM +public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> implements StatefulOperatorSpec { // Object == M | OM private final OperatorSpec<?, M> leftInputOpSpec; - private final OperatorSpec<?, JM> rightInputOpSpec; - private final JoinFunction<K, M, JM, RM> joinFn; + private final OperatorSpec<?, OM> rightInputOpSpec; + private final JoinFunction<K, M, OM, JM> joinFn; + private final Serde<K> keySerde; + private final Serde<TimestampedValue<M>> messageSerde; + private final Serde<TimestampedValue<OM>> otherMessageSerde; private final long ttlMs; /** @@ -47,15 +58,45 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { / * @param ttlMs the ttl in ms for retaining messages in each stream * @param opId the unique ID for this operator */ - JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec, - JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) { + JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, + JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, + long ttlMs, int opId) { super(OpCode.JOIN, opId); this.leftInputOpSpec = leftInputOpSpec; this.rightInputOpSpec = rightInputOpSpec; this.joinFn = joinFn; + this.keySerde = keySerde; + this.messageSerde = new TimestampedValueSerde<>(messageSerde); + this.otherMessageSerde = new TimestampedValueSerde<>(otherMessageSerde); this.ttlMs = ttlMs; } + @Override + public Collection<StoreDescriptor> getStoreDescriptors() { + String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; + String leftStoreName = getLeftOpName(); + String rightStoreName = getRightOpName(); + Map<String, String> leftStoreCustomProps = ImmutableMap.of( + String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs), + String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete", + String.format("stores.%s.changelog.kafka.retention.ms", leftStoreName), Long.toString(ttlMs)); + Map<String, String> rightStoreCustomProps = ImmutableMap.of( + String.format("stores.%s.rocksdb.ttl.ms", rightStoreName), Long.toString(ttlMs), + String.format("stores.%s.changelog.kafka.cleanup.policy", rightStoreName), "delete", + String.format("stores.%s.changelog.kafka.retention.ms", rightStoreName), Long.toString(ttlMs)); + + return Arrays.asList( + new StoreDescriptor(leftStoreName, rocksDBStoreFactory, this.keySerde, this.messageSerde, + leftStoreName, leftStoreCustomProps), + new StoreDescriptor(rightStoreName, rocksDBStoreFactory, this.keySerde, this.otherMessageSerde, + rightStoreName, rightStoreCustomProps)); + } + + @Override + public WatermarkFunction getWatermarkFn() { + return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; + } + public OperatorSpec getLeftInputOpSpec() { return leftInputOpSpec; } @@ -64,16 +105,19 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { / return rightInputOpSpec; } - public JoinFunction<K, M, JM, RM> getJoinFn() { + public String getLeftOpName() { + return this.getOpName() + "-L"; + } + + public String getRightOpName() { + return this.getOpName() + "-R"; + } + + public JoinFunction<K, M, OM, JM> getJoinFn() { return this.joinFn; } public long getTtlMs() { return ttlMs; } - - @Override - public WatermarkFunction getWatermarkFn() { - return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; - } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index e67179e..8b2b177 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.serializers.Serde; import org.apache.samza.task.TaskContext; import java.util.ArrayList; @@ -189,18 +190,22 @@ public class OperatorSpecs { * @param leftInputOpSpec the operator spec for the stream on the left side of the join * @param rightInputOpSpec the operator spec for the stream on the right side of the join * @param joinFn the user-defined join function to get join keys and results + * @param keySerde the serde for the join key + * @param messageSerde the serde for messages in the stream on the lefta side of the join + * @param otherMessageSerde the serde for messages in the stream on the right side of the join * @param ttlMs the ttl in ms for retaining messages in each stream * @param opId the unique ID of the operator * @param <K> the type of join key * @param <M> the type of input message - * @param <JM> the type of message in the other join stream - * @param <RM> the type of join result + * @param <OM> the type of message in the other stream + * @param <JM> the type of join result * @return the {@link JoinOperatorSpec} */ - public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> createJoinOperatorSpec( - OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec, - JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) { - return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, ttlMs, opId); + public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec( + OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn, + Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) { + return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, + keySerde, messageSerde, otherMessageSerde, ttlMs, opId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java new file mode 100644 index 0000000..90dfe59 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.spec; + +import java.util.Collection; + + +/** + * Spec for stateful operators. + */ +public interface StatefulOperatorSpec { + + /** + * Get the store descriptors for stores required by this operator. + * + * @return store descriptors for this operator's stores + */ + Collection<StoreDescriptor> getStoreDescriptors(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java new file mode 100644 index 0000000..8aa2dd9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.config.JavaStorageConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.serializers.Serde; + +import java.util.HashMap; +import java.util.Map; + +/** + * A descriptor for a store. + */ +public class StoreDescriptor { + private final String storeName; + private final String storeFactory; + private final Serde keySerde; + private final Serde msgSerde; + private final String changelogStream; + private final Map<String, String> otherProperties; + + StoreDescriptor(String storeName, String storeFactory, Serde keySerde, Serde msgSerde, + String changelogStream, Map<String, String> otherProperties) { + this.storeName = storeName; + this.storeFactory = storeFactory; + this.keySerde = keySerde; + this.msgSerde = msgSerde; + this.changelogStream = changelogStream; + this.otherProperties = otherProperties; + } + + public String getStoreName() { + return storeName; + } + + public Serde getKeySerde() { + return keySerde; + } + + public Serde getMsgSerde() { + return msgSerde; + } + + public JavaStorageConfig getStorageConfigs() { + HashMap<String, String> configs = new HashMap<>(); + configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), this.getStoreFactory()); + configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), this.getStoreName()), this.getChangelogStream()); + configs.putAll(this.getOtherProperties()); + return new JavaStorageConfig(new MapConfig(configs)); + } + + private String getStoreFactory() { + return storeFactory; + } + + private String getChangelogStream() { + return changelogStream; + } + + private Map<String, String> getOtherProperties() { + return otherProperties; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 1caca26..ff0299d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -154,7 +154,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { writePlanJsonFile(executionPlanJson); // 2. create the necessary streams - // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391 + // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391 String planId = String.valueOf(executionPlanJson.hashCode()); createStreams(planId, plan.getIntermediateStreams()); http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 4c4a645..9d44ec1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -216,13 +216,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { public void onNewJobModelAvailable(final String version) { debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> { - LOG.info("pid=" + processorId + "new JobModel available"); + LOG.info("pid=" + processorId + ": new JobModel available"); // get the new job model from ZK newJobModel = zkUtils.getJobModel(version); LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel); if (!newJobModel.getContainers().containsKey(processorId)) { - LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId); + LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}", + processorId, newJobModel); stop(); } else { // stop current work http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index c3e1be2..6071c1f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -805,7 +805,7 @@ class SamzaContainer( */ def shutdown(): Unit = { if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) { - throw new IllegalContainerStateException("Cannot shutdown a container with status - " + status) + throw new IllegalContainerStateException("Cannot shutdown a container with status " + status) } shutdownRunLoop() } @@ -936,16 +936,18 @@ class SamzaContainer( val runLoopThread = Thread.currentThread() shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") { override def run() = { - info("Shutting down, will wait up to %s ms" format shutdownMs) + info("Shutting down, will wait up to %s ms." format shutdownMs) shutdownRunLoop() //TODO: Pull out shutdown hook to LocalContainerRunner or SP try { runLoopThread.join(shutdownMs) } catch { case e: Throwable => // Ignore to avoid deadlock with uncaughtExceptionHandler. See SAMZA-1220 - error("Did not shut down within %s ms, exiting" format shutdownMs, e) + error("Did not shut down within %s ms, exiting." format shutdownMs, e) } if (!runLoopThread.isAlive) { info("Shutdown complete") + } else { + error("Did not shut down within %s ms, exiting." format shutdownMs) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java index 95939c4..df393b0 100644 --- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -49,7 +49,9 @@ public class OrderShipmentJoinExample implements StreamApplication { KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class))); orders - .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1)) + .join(shipments, new MyJoinFunction(), + new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), + Duration.ofMinutes(1)) .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) .sendTo(fulfilledOrders); } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 50b0a13..f6441dc 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -31,6 +31,7 @@ import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; @@ -130,14 +131,14 @@ public class TestExecutionPlanner { */ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream<KV<Object, Object>> m1 = + MessageStream<KV<Object, Object>> messageStream1 = streamGraph.<KV<Object, Object>>getInputStream("input1") .map(m -> m); - MessageStream<KV<Object, Object>> m2 = + MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value) .filter(m -> true); - MessageStream<KV<Object, Object>> m3 = + MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value) @@ -145,8 +146,14 @@ public class TestExecutionPlanner { OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); - m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1); - m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2); + messageStream1 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) + .sendTo(output1); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + .sendTo(output2); return streamGraph; } @@ -154,14 +161,14 @@ public class TestExecutionPlanner { private StreamGraphImpl createStreamGraphWithJoinAndWindow() { StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream<KV<Object, Object>> m1 = + MessageStream<KV<Object, Object>> messageStream1 = streamGraph.<KV<Object, Object>>getInputStream("input1") .map(m -> m); - MessageStream<KV<Object, Object>> m2 = + MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value) .filter(m -> true); - MessageStream<KV<Object, Object>> m3 = + MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value) @@ -169,17 +176,26 @@ public class TestExecutionPlanner { OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); - m1.map(m -> m) + messageStream1.map(m -> m) .filter(m->true) .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8))); - m2.map(m -> m) + messageStream2.map(m -> m) .filter(m->true) .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16))); - m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1); - m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2); - m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(252)).sendTo(output2); + messageStream1 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600)) + .sendTo(output1); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100)) + .sendTo(output2); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252)) + .sendTo(output2); return streamGraph; } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index 095e407..10c4aa3 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -30,6 +30,7 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.codehaus.jackson.map.ObjectMapper; @@ -107,14 +108,14 @@ public class TestJobGraphJsonGenerator { StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); - MessageStream<KV<Object, Object>> m1 = + MessageStream<KV<Object, Object>> messageStream1 = streamGraph.<KV<Object, Object>>getInputStream("input1") .map(m -> m); - MessageStream<KV<Object, Object>> m2 = + MessageStream<KV<Object, Object>> messageStream2 = streamGraph.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value) .filter(m -> true); - MessageStream<KV<Object, Object>> m3 = + MessageStream<KV<Object, Object>> messageStream3 = streamGraph.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value) @@ -122,9 +123,15 @@ public class TestJobGraphJsonGenerator { OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1"); OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2"); - m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1); - m2.sink((message, collector, coordinator) -> { }); - m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2); + messageStream1 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) + .sendTo(outputStream1); + messageStream2.sink((message, collector, coordinator) -> { }); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + .sendTo(outputStream2); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); ExecutionPlan plan = planner.plan(streamGraph); http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java index 918da26..f6ebaf9 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@ -26,6 +26,8 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; @@ -35,6 +37,7 @@ import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.junit.Test; +import java.time.Duration; import java.util.Base64; import java.util.HashMap; import java.util.Map; @@ -50,25 +53,35 @@ public class TestJobNode { @Test public void testAddSerdeConfigs() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec inputSpec = new StreamSpec("input", "input", "input-system"); + StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system"); + StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system"); StreamSpec outputSpec = new StreamSpec("output", "output", "output-system"); - StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system"); - doReturn(inputSpec).when(mockRunner).getStreamSpec("input"); + StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system"); + doReturn(input1Spec).when(mockRunner).getStreamSpec("input1"); + doReturn(input2Spec).when(mockRunner).getStreamSpec("input2"); doReturn(outputSpec).when(mockRunner).getStreamSpec("output"); - doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1"); + doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2"); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); - MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input"); + MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1"); + MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2"); OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output"); - input.partitionBy(KV::getKey, KV::getValue).sendTo(output); + JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); + input1 + .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value) + .join(input2.map(kv -> kv.value), mockJoinFn, + new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1)) + .sendTo(output); JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class)); Config config = new MapConfig(); - StreamEdge inputEdge = new StreamEdge(inputSpec, config); + StreamEdge input1Edge = new StreamEdge(input1Spec, config); + StreamEdge input2Edge = new StreamEdge(input2Spec, config); StreamEdge outputEdge = new StreamEdge(outputSpec, config); StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config); - jobNode.addInEdge(inputEdge); + jobNode.addInEdge(input1Edge); + jobNode.addInEdge(input2Edge); jobNode.addOutEdge(outputEdge); jobNode.addInEdge(repartitionEdge); jobNode.addOutEdge(repartitionEdge); @@ -85,28 +98,74 @@ public class TestJobNode { e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) )); - assertEquals(2, serializers.size()); + assertEquals(5, serializers.size()); // 2 default + 3 specific for join - String inputKeySerde = mapConfig.get("streams.input.samza.key.serde"); - String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde"); - assertTrue(deserializedSerdes.containsKey(inputKeySerde)); - assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue(deserializedSerdes.containsKey(inputMsgSerde)); - assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde"); + String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde"); + assertTrue("Serialized serdes should contain input1 key serde", + deserializedSerdes.containsKey(input1KeySerde)); + assertTrue("Serialized input1 key serde should be a StringSerde", + input1KeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain input1 msg serde", + deserializedSerdes.containsKey(input1MsgSerde)); + assertTrue("Serialized input1 msg serde should be a JsonSerdeV2", + input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde"); + String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde"); + assertTrue("Serialized serdes should contain input2 key serde", + deserializedSerdes.containsKey(input2KeySerde)); + assertTrue("Serialized input2 key serde should be a StringSerde", + input2KeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain input2 msg serde", + deserializedSerdes.containsKey(input2MsgSerde)); + assertTrue("Serialized input2 msg serde should be a JsonSerdeV2", + input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); String outputKeySerde = mapConfig.get("streams.output.samza.key.serde"); String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde"); - assertTrue(deserializedSerdes.containsKey(outputKeySerde)); - assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue(deserializedSerdes.containsKey(outputMsgSerde)); - assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - - String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde"); - String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde"); - assertTrue(deserializedSerdes.containsKey(partitionByKeySerde)); - assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde)); - assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + assertTrue("Serialized serdes should contain output key serde", + deserializedSerdes.containsKey(outputKeySerde)); + assertTrue("Serialized output key serde should be a StringSerde", + outputKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain output msg serde", + deserializedSerdes.containsKey(outputMsgSerde)); + assertTrue("Serialized output msg serde should be a StringSerde", + outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde"); + String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde"); + assertTrue("Serialized serdes should contain intermediate stream key serde", + deserializedSerdes.containsKey(partitionByKeySerde)); + assertTrue("Serialized intermediate stream key serde should be a StringSerde", + partitionByKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain intermediate stream msg serde", + deserializedSerdes.containsKey(partitionByMsgSerde)); + assertTrue( + "Serialized intermediate stream msg serde should be a StringSerde", + partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde"); + String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde"); + assertTrue("Serialized serdes should contain left join store key serde", + deserializedSerdes.containsKey(leftJoinStoreKeySerde)); + assertTrue("Serialized left join store key serde should be a StringSerde", + leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain left join store msg serde", + deserializedSerdes.containsKey(leftJoinStoreMsgSerde)); + assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde", + leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); + + String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde"); + String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde"); + assertTrue("Serialized serdes should contain right join store key serde", + deserializedSerdes.containsKey(rightJoinStoreKeySerde)); + assertTrue("Serialized right join store key serde should be a StringSerde", + rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain right join store msg serde", + deserializedSerdes.containsKey(rightJoinStoreMsgSerde)); + assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde", + rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); } } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 0df6721..1120c25 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -22,10 +22,13 @@ import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -34,7 +37,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.container.TaskContextImpl; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.TestClock; @@ -49,6 +51,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -261,6 +264,9 @@ public class TestJoinOperator { .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + // need to return different stores for left and right side + when(taskContext.getStore(eq("join-4-L"))).thenReturn(new InternalInMemoryStore<>()); + when(taskContext.getStore(eq("join-4-R"))).thenReturn(new InternalInMemoryStore<>()); Config config = mock(Config.class); @@ -287,7 +293,9 @@ public class TestJoinOperator { SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream - .join(inStream2, joinFn, JOIN_TTL) + .join(inStream2, joinFn, + new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new JsonSerdeV2<>(SecondStreamIME.class), + JOIN_TTL) .sink((message, messageCollector, taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); }); http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index c6554bc..f23bb14 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -41,6 +41,7 @@ import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -288,7 +289,7 @@ public class TestMessageStreamImpl { mock(JoinFunction.class); Duration joinTtl = Duration.ofMinutes(1); - source1.join(source2, mockJoinFn, joinTtl); + source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl); ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture()); http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index d73c545..a759e52 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -44,6 +44,7 @@ import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec.OpCode; +import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -218,10 +219,13 @@ public class TestOperatorImplGraph { JoinFunction mockJoinFunction = mock(JoinFunction.class); MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>()); MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>()); - inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1)); + inputStream1.join(inputStream2, mockJoinFunction, + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new InternalInMemoryStore<>()); + when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new InternalInMemoryStore<>()); OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); @@ -386,15 +390,22 @@ public class TestOperatorImplGraph { .thenReturn(int2); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - Serde inputSerde = new NoOpSerde<>(); - MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", inputSerde).filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m); - OutputStream<Object> om1 = streamGraph.getOutputStream("output1"); - OutputStream<Object> om2 = streamGraph.getOutputStream("output2"); - - m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha", m -> m).sendTo(om1); - m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2); + MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m -> m); + MessageStream messageStream2 = streamGraph.getInputStream("input2").filter(m -> true); + MessageStream messageStream3 = + streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m); + OutputStream<Object> outputStream1 = streamGraph.getOutputStream("output1"); + OutputStream<Object> outputStream2 = streamGraph.getOutputStream("output2"); + + messageStream1 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2)) + .partitionBy(m -> "haha", m -> m) + .sendTo(outputStream1); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1)) + .sendTo(outputStream2); Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph); Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
