This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new cb06ae1  SAMZA-2299: StreamTableJoin operator should invoke user join 
function only once
cb06ae1 is described below

commit cb06ae13ad5d6f05ab7c0ce0833677fcb01f950a
Author: mynameborat <[email protected]>
AuthorDate: Mon Aug 12 15:04:40 2019 -0700

    SAMZA-2299: StreamTableJoin operator should invoke user join function only 
once
    
    Author: mynameborat <[email protected]>
    
    Reviewers: Aditya Toomula <[email protected]>
    
    Closes #1135 from mynameborat/SAMZA-2299 and squashes the following commits:
    
    81cc51aa [mynameborat] Address Dengpan's comments
    41a91656 [mynameborat] SAMZA-2299: StreamTableJoin operator should invoke 
user join function only once
---
 .../impl/StreamTableJoinOperatorImpl.java          |  7 ++-
 .../impl/TestStreamTableJoinOperatorImpl.java      | 51 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 6 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 7d74dfc..30347ce 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -74,10 +74,9 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> 
extends OperatorImpl<M
   }
 
   private Collection<JM> getJoinOutput(K key, Object value, M message) {
-    JM output = Optional.ofNullable(value)
-        .map(val -> (R) KV.of(key, val))
-        .map(record -> joinOpSpec.getJoinFn().apply(message, record))
-        .orElseGet(() -> joinOpSpec.getJoinFn().apply(message, null));
+    R record = value == null ? null : (R) KV.of(key, value);
+
+    JM output = joinOpSpec.getJoinFn().apply(message, record);
 
     // The support for inner and outer join will be provided in the jonFn. For 
inner join, the joinFn might
     // return null, when the corresponding record is absent in the table.
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 236fd39..467c0ca 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import junit.framework.Assert;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
@@ -34,8 +35,8 @@ import org.junit.Test;
 
 import java.util.Collection;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestStreamTableJoinOperatorImpl {
@@ -94,4 +95,50 @@ public class TestStreamTableJoinOperatorImpl {
     Assert.assertEquals(0, result.size());
   }
 
+  /**
+   * Ensure join function is not invoked more than once when join function 
returns null on the first invocation
+   */
+  @Test
+  public void testJoinFunctionIsInvokedOnlyOnce() {
+    final String tableId = "testTable";
+    final CountDownLatch joinInvokedLatch = new CountDownLatch(1);
+
+    StreamTableJoinOperatorSpec mockJoinOpSpec = 
mock(StreamTableJoinOperatorSpec.class);
+    when(mockJoinOpSpec.getTableId()).thenReturn(tableId);
+    when(mockJoinOpSpec.getArgs()).thenReturn(new Object[0]);
+    when(mockJoinOpSpec.getJoinFn()).thenReturn(
+        new StreamTableJoinFunction<String, KV<String, String>, KV<String, 
String>, String>() {
+          @Override
+          public String apply(KV<String, String> message, KV<String, String> 
record) {
+            joinInvokedLatch.countDown();
+            return null;
+          }
+
+          @Override
+          public String getMessageKey(KV<String, String> message) {
+            return message.getKey();
+          }
+
+          @Override
+          public String getRecordKey(KV<String, String> record) {
+            return record.getKey();
+          }
+        });
+
+    ReadWriteTable table = mock(ReadWriteTable.class);
+    
when(table.getAsync("1")).thenReturn(CompletableFuture.completedFuture("r1"));
+
+    Context context = new MockContext();
+    when(context.getTaskContext().getTable(tableId)).thenReturn(table);
+
+    MessageCollector mockMessageCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+
+    StreamTableJoinOperatorImpl streamTableJoinOperator = new 
StreamTableJoinOperatorImpl(mockJoinOpSpec, context);
+
+    // Table has the key
+    streamTableJoinOperator.handleMessage(KV.of("1", "m1"), 
mockMessageCollector, mockTaskCoordinator);
+    assertEquals("Join function should only be invoked once", 0, 
joinInvokedLatch.getCount());
+  }
+
 }

Reply via email to