This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 709706a SAMZA-2116: Making sendTo(table), sendTo(stream) non-terminal
709706a is described below
commit 709706a7807536c1561f63de0fe75db92721db3e
Author: Sanil15 <[email protected]>
AuthorDate: Mon Apr 22 10:43:48 2019 -0700
SAMZA-2116: Making sendTo(table), sendTo(stream) non-terminal
Author: Sanil15 <[email protected]>
Reviewers: Bharath Kumarasubramaniam <[email protected]>, Prateek
Maheshwari <[email protected]>
Closes #984 from Sanil15/SAMZA-2116
---
.../org/apache/samza/operators/MessageStream.java | 24 ++++++++++++++-----
.../apache/samza/operators/MessageStreamImpl.java | 6 +++--
.../samza/operators/impl/OutputOperatorImpl.java | 11 ++++-----
.../operators/impl/SendToTableOperatorImpl.java | 12 ++++------
.../samza/operators/spec/OutputOperatorSpec.java | 4 +---
.../operators/spec/SendToTableOperatorSpec.java | 2 +-
.../StreamApplicationIntegrationTest.java | 28 +++++++++++++++++-----
7 files changed, 56 insertions(+), 31 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 141a4d2..451b925 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -112,7 +112,7 @@ public interface MessageStream<M> {
* Offers more control over processing and sending messages than {@link
#sendTo(OutputStream)} since
* the {@link SinkFunction} has access to the {@link
org.apache.samza.task.MessageCollector} and
* {@link org.apache.samza.task.TaskCoordinator}.
- * <p>
+ *
* This can also be used to send output to a system (e.g. a database) that
doesn't have a corresponding
* Samza SystemProducer implementation.
*
@@ -121,14 +121,19 @@ public interface MessageStream<M> {
void sink(SinkFunction<? super M> sinkFn);
/**
- * Allows sending messages in this {@link MessageStream} to an {@link
OutputStream}.
+ * Allows sending messages in this {@link MessageStream} to an {@link
OutputStream} and then propagates this
+ * {@link MessageStream} to the next chained operator
* <p>
* When sending messages to an {@code OutputStream<KV<K, V>>}, messages are
partitioned using their serialized key.
* When sending messages to any other {@code OutputStream<M>}, messages are
partitioned using a null partition key.
+ * <p>
+ * Note: The message will be written but not flushed to the underlying
output system before its propagated to the
+ * chained operators. Messages retain the original partitioning scheme when
propogated to next operator.
*
* @param outputStream the output stream to send messages to
+ * @return this {@link MessageStream}
*/
- void sendTo(OutputStream<M> outputStream);
+ MessageStream<M> sendTo(OutputStream<M> outputStream);
/**
* Groups the messages in this {@link MessageStream} according to the
provided {@link Window} semantics
@@ -275,14 +280,21 @@ public interface MessageStream<M> {
MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde,
String id);
/**
- * Sends messages in this {@link MessageStream} to a {@link Table}. The type
of input message is expected
- * to be {@link KV}, otherwise a {@link ClassCastException} will be thrown.
+ * Allows sending messages in this {@link MessageStream} to a {@link Table}
and then propagates this
+ * {@link MessageStream} to the next chained operator. The type of input
message is expected to be {@link KV},
+ * otherwise a {@link ClassCastException} will be thrown.
+ * <p>
+ * Note: The message will be written but may not be flushed to the
underlying table before its propagated to the
+ * chained operators. Whether the message can be read back from the Table in
the chained operator depends on whether
+ * it was flushed and whether the Table offers read after write consistency.
Messages retain the original partitioning
+ * scheme when propogated to next operator.
*
* @param table the table to write messages to
* @param <K> the type of key in the table
* @param <V> the type of record value in the table
+ * @return this {@link MessageStream}
*/
- <K, V> void sendTo(Table<KV<K, V>> table);
+ <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
/**
* Broadcasts messages in this {@link MessageStream} to all instances of its
downstream operators..
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 99cc81a..6110ed1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -120,11 +120,12 @@ public class MessageStreamImpl<M> implements
MessageStream<M> {
}
@Override
- public void sendTo(OutputStream<M> outputStream) {
+ public MessageStream<M> sendTo(OutputStream<M> outputStream) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
(OutputStreamImpl<M>) outputStream, opId);
this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
@@ -188,11 +189,12 @@ public class MessageStreamImpl<M> implements
MessageStream<M> {
}
@Override
- public <K, V> void sendTo(Table<KV<K, V>> table) {
+ public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table) {
String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
SendToTableOperatorSpec<K, V> op =
OperatorSpecs.createSendToTableOperatorSpec(((TableImpl)
table).getTableId(), opId);
this.operatorSpec.registerNextOperatorSpec(op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index 566485a..8f34bc5 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.impl;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.context.Context;
@@ -29,15 +30,13 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
-
import java.util.Collection;
-import java.util.Collections;
/**
* An operator that sends incoming messages to an output {@link SystemStream}.
*/
-class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
+class OutputOperatorImpl<M> extends OperatorImpl<M, M> {
private final OutputOperatorSpec<M> outputOpSpec;
private final OutputStreamImpl<M> outputStream;
@@ -54,7 +53,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
}
@Override
- protected CompletionStage<Collection<Void>> handleMessageAsync(M message,
MessageCollector collector,
+ protected CompletionStage<Collection<M>> handleMessageAsync(M message,
MessageCollector collector,
TaskCoordinator coordinator) {
Object key, value;
if (outputStream.isKeyed()) {
@@ -66,7 +65,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
}
collector.send(new OutgoingMessageEnvelope(systemStream, null, key,
value));
- return CompletableFuture.completedFuture(Collections.emptyList());
+ return CompletableFuture.completedFuture(Collections.singleton(message));
}
@Override
@@ -74,7 +73,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
}
@Override
- protected OperatorSpec<M, Void> getOperatorSpec() {
+ protected OperatorSpec<M, M> getOperatorSpec() {
return outputOpSpec;
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 1197b37..f3aba5b 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.impl;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.context.Context;
@@ -27,9 +28,7 @@ import
org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
-
import java.util.Collection;
-import java.util.Collections;
/**
@@ -39,7 +38,7 @@ import java.util.Collections;
* @param <K> the type of the record key
* @param <V> the type of the record value
*/
-public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>,
Void> {
+public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>,
KV<K, V>> {
private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
private final ReadWriteTable<K, V> table;
@@ -54,11 +53,10 @@ public class SendToTableOperatorImpl<K, V> extends
OperatorImpl<KV<K, V>, Void>
}
@Override
- protected CompletionStage<Collection<Void>> handleMessageAsync(KV<K, V>
message, MessageCollector collector,
+ protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V>
message, MessageCollector collector,
TaskCoordinator coordinator) {
table.put(message.getKey(), message.getValue());
- // there should be no further chained operators since this is a terminal
operator.
- return CompletableFuture.completedFuture(Collections.emptyList());
+ return CompletableFuture.completedFuture(Collections.singleton(message));
}
@Override
@@ -67,7 +65,7 @@ public class SendToTableOperatorImpl<K, V> extends
OperatorImpl<KV<K, V>, Void>
}
@Override
- protected OperatorSpec<KV<K, V>, Void> getOperatorSpec() {
+ protected OperatorSpec<KV<K, V>, KV<K, V>> getOperatorSpec() {
return sendToTableOpSpec;
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index d6238b8..f8660e2 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -25,12 +25,10 @@ import
org.apache.samza.operators.functions.WatermarkFunction;
/**
* The spec for an operator that outputs a {@link
org.apache.samza.operators.MessageStream} to a
* {@link org.apache.samza.system.SystemStream}.
- * <p>
- * This is a terminal operator and does not allow further operator chaining.
*
* @param <M> the type of input message
*/
-public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
+public class OutputOperatorSpec<M> extends OperatorSpec<M, M> {
private final OutputStreamImpl<M> outputStream;
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index 1e44fa5..02248ae 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -31,7 +31,7 @@ import org.apache.samza.operators.functions.WatermarkFunction;
* @param <V> the type of the table record value
*/
@InterfaceStability.Unstable
-public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>,
Void> {
+public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>,
KV<K, V>> {
private final String tableId;
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index d9dfacb..5e25817 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -50,10 +50,13 @@ import
org.apache.samza.test.table.PageViewToProfileJoinFunction;
import org.apache.samza.test.table.TestTableData;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.samza.test.controlmessages.TestData.PageView;
public class StreamApplicationIntegrationTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamApplicationIntegrationTest.class);
private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk",
"group", "job"};
@@ -81,14 +84,20 @@ public class StreamApplicationIntegrationTest {
InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc
= isd
.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
+ InMemoryOutputDescriptor<String> joinKeysDescriptor = isd
+ .getOutputDescriptor("JoinPageKeys", new NoOpSerde<>());
+
TestRunner
.of(new PageViewProfileViewJoinApplication())
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
+ .addOutputStream(joinKeysDescriptor, 1)
.run(Duration.ofSeconds(2));
+
Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc,
Duration.ofSeconds(1)).get(0).size());
+ Assert.assertEquals(10, TestRunner.consumeStream(joinKeysDescriptor,
Duration.ofSeconds(1)).get(0).size());
}
@Test
@@ -149,21 +158,28 @@ public class StreamApplicationIntegrationTest {
KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new
JsonSerdeV2<>()));
- appDescriptor
- .getInputStream(profileISD)
- .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
- .sendTo(table);
-
KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new
JsonSerdeV2<>()));
KafkaOutputDescriptor<TestTableData.EnrichedPageView>
enrichedPageViewOSD =
ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
+ appDescriptor.getInputStream(profileISD)
+ .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
+ .sendTo(table)
+ .sink((kv, collector, coordinator) -> {
+ LOG.info("Inserted Profile with Key: {} in profile-view-store",
kv.getKey());
+ });
+
OutputStream<TestTableData.EnrichedPageView> outputStream =
appDescriptor.getOutputStream(enrichedPageViewOSD);
appDescriptor.getInputStream(pageViewISD)
.partitionBy(pv -> pv.getValue().getMemberId(), pv ->
pv.getValue(), KVSerde.of(new IntegerSerde(), new
JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
.join(table, new PageViewToProfileJoinFunction())
- .sendTo(outputStream);
+ .sendTo(outputStream)
+ .map(TestTableData.EnrichedPageView::getPageKey)
+ .sink((joinPageKey, collector, coordinator) -> {
+ collector.send(new OutgoingMessageEnvelope(new
SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
+ });
+
}
}