This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 709706a  SAMZA-2116: Making sendTo(table), sendTo(stream) non-terminal
709706a is described below

commit 709706a7807536c1561f63de0fe75db92721db3e
Author: Sanil15 <[email protected]>
AuthorDate: Mon Apr 22 10:43:48 2019 -0700

    SAMZA-2116: Making sendTo(table), sendTo(stream) non-terminal
    
    Author: Sanil15 <[email protected]>
    
    Reviewers: Bharath Kumarasubramaniam <[email protected]>, Prateek 
Maheshwari <[email protected]>
    
    Closes #984 from Sanil15/SAMZA-2116
---
 .../org/apache/samza/operators/MessageStream.java  | 24 ++++++++++++++-----
 .../apache/samza/operators/MessageStreamImpl.java  |  6 +++--
 .../samza/operators/impl/OutputOperatorImpl.java   | 11 ++++-----
 .../operators/impl/SendToTableOperatorImpl.java    | 12 ++++------
 .../samza/operators/spec/OutputOperatorSpec.java   |  4 +---
 .../operators/spec/SendToTableOperatorSpec.java    |  2 +-
 .../StreamApplicationIntegrationTest.java          | 28 +++++++++++++++++-----
 7 files changed, 56 insertions(+), 31 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 141a4d2..451b925 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -112,7 +112,7 @@ public interface MessageStream<M> {
    * Offers more control over processing and sending messages than {@link 
#sendTo(OutputStream)} since
    * the {@link SinkFunction} has access to the {@link 
org.apache.samza.task.MessageCollector} and
    * {@link org.apache.samza.task.TaskCoordinator}.
-   * <p>
+   *
    * This can also be used to send output to a system (e.g. a database) that 
doesn't have a corresponding
    * Samza SystemProducer implementation.
    *
@@ -121,14 +121,19 @@ public interface MessageStream<M> {
   void sink(SinkFunction<? super M> sinkFn);
 
   /**
-   * Allows sending messages in this {@link MessageStream} to an {@link 
OutputStream}.
+   * Allows sending messages in this {@link MessageStream} to an {@link 
OutputStream} and then propagates this
+   * {@link MessageStream} to the next chained operator
    * <p>
    * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are 
partitioned using their serialized key.
    * When sending messages to any other {@code OutputStream<M>}, messages are 
partitioned using a null partition key.
+   * <p>
+   * Note: The message will be written but not flushed to the underlying 
output system before its propagated to the
+   * chained operators. Messages retain the original partitioning scheme when 
propogated to next operator.
    *
    * @param outputStream the output stream to send messages to
+   * @return this {@link MessageStream}
    */
-  void sendTo(OutputStream<M> outputStream);
+  MessageStream<M> sendTo(OutputStream<M> outputStream);
 
   /**
    * Groups the messages in this {@link MessageStream} according to the 
provided {@link Window} semantics
@@ -275,14 +280,21 @@ public interface MessageStream<M> {
       MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, 
String id);
 
   /**
-   * Sends messages in this {@link MessageStream} to a {@link Table}. The type 
of input message is expected
-   * to be {@link KV}, otherwise a {@link ClassCastException} will be thrown.
+   * Allows sending messages in this {@link MessageStream} to a {@link Table} 
and then propagates this
+   * {@link MessageStream} to the next chained operator. The type of input 
message is expected to be {@link KV},
+   * otherwise a {@link ClassCastException} will be thrown.
+   * <p>
+   * Note: The message will be written but may not be flushed to the 
underlying table before its propagated to the
+   * chained operators. Whether the message can be read back from the Table in 
the chained operator depends on whether
+   * it was flushed and whether the Table offers read after write consistency. 
Messages retain the original partitioning
+   * scheme when propogated to next operator.
    *
    * @param table the table to write messages to
    * @param <K> the type of key in the table
    * @param <V> the type of record value in the table
+   * @return this {@link MessageStream}
    */
-  <K, V> void sendTo(Table<KV<K, V>> table);
+  <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table);
 
   /**
    * Broadcasts messages in this {@link MessageStream} to all instances of its 
downstream operators..
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 99cc81a..6110ed1 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -120,11 +120,12 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   }
 
   @Override
-  public void sendTo(OutputStream<M> outputStream) {
+  public MessageStream<M> sendTo(OutputStream<M> outputStream) {
     String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
         (OutputStreamImpl<M>) outputStream, opId);
     this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
@@ -188,11 +189,12 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   }
 
   @Override
-  public <K, V> void sendTo(Table<KV<K, V>> table) {
+  public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table) {
     String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
     SendToTableOperatorSpec<K, V> op =
         OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) 
table).getTableId(), opId);
     this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.streamAppDesc, op);
   }
 
   @Override
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index 566485a..8f34bc5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import org.apache.samza.context.Context;
@@ -29,15 +30,13 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
-
 import java.util.Collection;
-import java.util.Collections;
 
 
 /**
  * An operator that sends incoming messages to an output {@link SystemStream}.
  */
-class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
+class OutputOperatorImpl<M> extends OperatorImpl<M, M> {
 
   private final OutputOperatorSpec<M> outputOpSpec;
   private final OutputStreamImpl<M> outputStream;
@@ -54,7 +53,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected CompletionStage<Collection<Void>> handleMessageAsync(M message, 
MessageCollector collector,
+  protected CompletionStage<Collection<M>> handleMessageAsync(M message, 
MessageCollector collector,
       TaskCoordinator coordinator) {
     Object key, value;
     if (outputStream.isKeyed()) {
@@ -66,7 +65,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
     }
 
     collector.send(new OutgoingMessageEnvelope(systemStream, null, key, 
value));
-    return CompletableFuture.completedFuture(Collections.emptyList());
+    return CompletableFuture.completedFuture(Collections.singleton(message));
   }
 
   @Override
@@ -74,7 +73,7 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected OperatorSpec<M, Void> getOperatorSpec() {
+  protected OperatorSpec<M, M> getOperatorSpec() {
     return outputOpSpec;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 1197b37..f3aba5b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import org.apache.samza.context.Context;
@@ -27,9 +28,7 @@ import 
org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
-
 import java.util.Collection;
-import java.util.Collections;
 
 
 /**
@@ -39,7 +38,7 @@ import java.util.Collections;
  * @param <K> the type of the record key
  * @param <V> the type of the record value
  */
-public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, 
Void> {
+public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, 
KV<K, V>> {
 
   private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
   private final ReadWriteTable<K, V> table;
@@ -54,11 +53,10 @@ public class SendToTableOperatorImpl<K, V> extends 
OperatorImpl<KV<K, V>, Void>
   }
 
   @Override
-  protected CompletionStage<Collection<Void>> handleMessageAsync(KV<K, V> 
message, MessageCollector collector,
+  protected CompletionStage<Collection<KV<K, V>>> handleMessageAsync(KV<K, V> 
message, MessageCollector collector,
       TaskCoordinator coordinator) {
     table.put(message.getKey(), message.getValue());
-    // there should be no further chained operators since this is a terminal 
operator.
-    return CompletableFuture.completedFuture(Collections.emptyList());
+    return CompletableFuture.completedFuture(Collections.singleton(message));
   }
 
   @Override
@@ -67,7 +65,7 @@ public class SendToTableOperatorImpl<K, V> extends 
OperatorImpl<KV<K, V>, Void>
   }
 
   @Override
-  protected OperatorSpec<KV<K, V>, Void> getOperatorSpec() {
+  protected OperatorSpec<KV<K, V>, KV<K, V>> getOperatorSpec() {
     return sendToTableOpSpec;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index d6238b8..f8660e2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -25,12 +25,10 @@ import 
org.apache.samza.operators.functions.WatermarkFunction;
 /**
  * The spec for an operator that outputs a {@link 
org.apache.samza.operators.MessageStream} to a
  * {@link org.apache.samza.system.SystemStream}.
- * <p>
- * This is a terminal operator and does not allow further operator chaining.
  *
  * @param <M>  the type of input message
  */
-public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
+public class OutputOperatorSpec<M> extends OperatorSpec<M, M> {
 
   private final OutputStreamImpl<M> outputStream;
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index 1e44fa5..02248ae 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -31,7 +31,7 @@ import org.apache.samza.operators.functions.WatermarkFunction;
  * @param <V> the type of the table record value
  */
 @InterfaceStability.Unstable
-public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, 
Void> {
+public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, 
KV<K, V>> {
 
   private final String tableId;
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index d9dfacb..5e25817 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -50,10 +50,13 @@ import 
org.apache.samza.test.table.PageViewToProfileJoinFunction;
 import org.apache.samza.test.table.TestTableData;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.samza.test.controlmessages.TestData.PageView;
 
 public class StreamApplicationIntegrationTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamApplicationIntegrationTest.class);
 
   private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", 
"group", "job"};
 
@@ -81,14 +84,20 @@ public class StreamApplicationIntegrationTest {
     InMemoryOutputDescriptor<TestTableData.EnrichedPageView> outputStreamDesc 
= isd
         .getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
 
+    InMemoryOutputDescriptor<String> joinKeysDescriptor = isd
+        .getOutputDescriptor("JoinPageKeys", new NoOpSerde<>());
+
     TestRunner
         .of(new PageViewProfileViewJoinApplication())
         .addInputStream(pageViewStreamDesc, pageViews)
         .addInputStream(profileStreamDesc, profiles)
         .addOutputStream(outputStreamDesc, 1)
+        .addOutputStream(joinKeysDescriptor, 1)
         .run(Duration.ofSeconds(2));
 
+
     Assert.assertEquals(10, TestRunner.consumeStream(outputStreamDesc, 
Duration.ofSeconds(1)).get(0).size());
+    Assert.assertEquals(10, TestRunner.consumeStream(joinKeysDescriptor, 
Duration.ofSeconds(1)).get(0).size());
   }
 
   @Test
@@ -149,21 +158,28 @@ public class StreamApplicationIntegrationTest {
       KafkaInputDescriptor<KV<String, TestTableData.Profile>> profileISD =
           ksd.getInputDescriptor("Profile", KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
 
-      appDescriptor
-          .getInputStream(profileISD)
-          .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
-          .sendTo(table);
-
       KafkaInputDescriptor<KV<String, TestTableData.PageView>> pageViewISD =
           ksd.getInputDescriptor("PageView", KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
       KafkaOutputDescriptor<TestTableData.EnrichedPageView> 
enrichedPageViewOSD =
           ksd.getOutputDescriptor("EnrichedPageView", new JsonSerdeV2<>());
 
+      appDescriptor.getInputStream(profileISD)
+          .map(m -> new KV(m.getValue().getMemberId(), m.getValue()))
+          .sendTo(table)
+          .sink((kv, collector, coordinator) -> {
+              LOG.info("Inserted Profile with Key: {} in profile-view-store", 
kv.getKey());
+            });
+
       OutputStream<TestTableData.EnrichedPageView> outputStream = 
appDescriptor.getOutputStream(enrichedPageViewOSD);
       appDescriptor.getInputStream(pageViewISD)
           .partitionBy(pv -> pv.getValue().getMemberId(),  pv -> 
pv.getValue(), KVSerde.of(new IntegerSerde(), new 
JsonSerdeV2<>(TestTableData.PageView.class)), "p1")
           .join(table, new PageViewToProfileJoinFunction())
-          .sendTo(outputStream);
+          .sendTo(outputStream)
+          .map(TestTableData.EnrichedPageView::getPageKey)
+          .sink((joinPageKey, collector, coordinator) -> {
+              collector.send(new OutgoingMessageEnvelope(new 
SystemStream("test", "JoinPageKeys"), null, null, joinPageKey));
+            });
+
     }
   }
 

Reply via email to