This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new cb06ae1 SAMZA-2299: StreamTableJoin operator should invoke user join
function only once
cb06ae1 is described below
commit cb06ae13ad5d6f05ab7c0ce0833677fcb01f950a
Author: mynameborat <[email protected]>
AuthorDate: Mon Aug 12 15:04:40 2019 -0700
SAMZA-2299: StreamTableJoin operator should invoke user join function only
once
Author: mynameborat <[email protected]>
Reviewers: Aditya Toomula <[email protected]>
Closes #1135 from mynameborat/SAMZA-2299 and squashes the following commits:
81cc51aa [mynameborat] Address Dengpan's comments
41a91656 [mynameborat] SAMZA-2299: StreamTableJoin operator should invoke
user join function only once
---
.../impl/StreamTableJoinOperatorImpl.java | 7 ++-
.../impl/TestStreamTableJoinOperatorImpl.java | 51 +++++++++++++++++++++-
2 files changed, 52 insertions(+), 6 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 7d74dfc..30347ce 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -74,10 +74,9 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM>
extends OperatorImpl<M
}
private Collection<JM> getJoinOutput(K key, Object value, M message) {
- JM output = Optional.ofNullable(value)
- .map(val -> (R) KV.of(key, val))
- .map(record -> joinOpSpec.getJoinFn().apply(message, record))
- .orElseGet(() -> joinOpSpec.getJoinFn().apply(message, null));
+ R record = value == null ? null : (R) KV.of(key, value);
+
+ JM output = joinOpSpec.getJoinFn().apply(message, record);
// The support for inner and outer join will be provided in the jonFn. For
inner join, the joinFn might
// return null, when the corresponding record is absent in the table.
diff --git
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 236fd39..467c0ca 100644
---
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
@@ -34,8 +35,8 @@ import org.junit.Test;
import java.util.Collection;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestStreamTableJoinOperatorImpl {
@@ -94,4 +95,50 @@ public class TestStreamTableJoinOperatorImpl {
Assert.assertEquals(0, result.size());
}
+ /**
+ * Ensure join function is not invoked more than once when join function
returns null on the first invocation
+ */
+ @Test
+ public void testJoinFunctionIsInvokedOnlyOnce() {
+ final String tableId = "testTable";
+ final CountDownLatch joinInvokedLatch = new CountDownLatch(1);
+
+ StreamTableJoinOperatorSpec mockJoinOpSpec =
mock(StreamTableJoinOperatorSpec.class);
+ when(mockJoinOpSpec.getTableId()).thenReturn(tableId);
+ when(mockJoinOpSpec.getArgs()).thenReturn(new Object[0]);
+ when(mockJoinOpSpec.getJoinFn()).thenReturn(
+ new StreamTableJoinFunction<String, KV<String, String>, KV<String,
String>, String>() {
+ @Override
+ public String apply(KV<String, String> message, KV<String, String>
record) {
+ joinInvokedLatch.countDown();
+ return null;
+ }
+
+ @Override
+ public String getMessageKey(KV<String, String> message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getRecordKey(KV<String, String> record) {
+ return record.getKey();
+ }
+ });
+
+ ReadWriteTable table = mock(ReadWriteTable.class);
+
when(table.getAsync("1")).thenReturn(CompletableFuture.completedFuture("r1"));
+
+ Context context = new MockContext();
+ when(context.getTaskContext().getTable(tableId)).thenReturn(table);
+
+ MessageCollector mockMessageCollector = mock(MessageCollector.class);
+ TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+
+ StreamTableJoinOperatorImpl streamTableJoinOperator = new
StreamTableJoinOperatorImpl(mockJoinOpSpec, context);
+
+ // Table has the key
+ streamTableJoinOperator.handleMessage(KV.of("1", "m1"),
mockMessageCollector, mockTaskCoordinator);
+ assertEquals("Join function should only be invoked once", 0,
joinInvokedLatch.getCount());
+ }
+
}