This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6685b28  SAMZA-2490: AsyncFlatmapOperatorImpl#handleClose should call 
transformFn.close() (#1320)
6685b28 is described below

commit 6685b28b5e744ba102675c01fb135059446ab583
Author: Yixing Zhang <[email protected]>
AuthorDate: Thu Mar 19 10:52:43 2020 -0700

    SAMZA-2490: AsyncFlatmapOperatorImpl#handleClose should call 
transformFn.close() (#1320)
---
 .../operators/impl/AsyncFlatmapOperatorImpl.java   |  1 +
 .../impl/TestAsyncFlatmapOperatorImpl.java         | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
index fa5e56a..bc55472 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
@@ -49,6 +49,7 @@ public class AsyncFlatmapOperatorImpl<M, RM> extends 
OperatorImpl<M, RM> {
 
   @Override
   protected void handleClose() {
+    this.transformFn.close();
   }
 
   @Override
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
new file mode 100644
index 0000000..9e8f755
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestAsyncFlatmapOperatorImpl {
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAsyncFlatMapOperator() {
+    AsyncFlatMapOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> 
mockOp = mock(AsyncFlatMapOperatorSpec.class);
+    AsyncFlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> 
txfmFn = mock(AsyncFlatMapFunction.class);
+    when(mockOp.getTransformFn()).thenReturn(txfmFn);
+    AsyncFlatmapOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> 
opImpl =
+        new AsyncFlatmapOperatorImpl<>(mockOp);
+    TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
+    Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
+    when(txfmFn.apply(inMsg)).thenReturn(CompletableFuture.supplyAsync(() -> 
mockOutputs));
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    Collection<TestOutputMessageEnvelope> results = opImpl
+        .handleMessage(inMsg, mockCollector, mockCoordinator);
+    verify(txfmFn, times(1)).apply(inMsg);
+    assertEquals(results, mockOutputs);
+  }
+
+  @Test
+  public void testAsyncFlatMapOperatorClose() {
+    AsyncFlatMapOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> 
mockOp = mock(AsyncFlatMapOperatorSpec.class);
+    AsyncFlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> 
txfmFn = mock(AsyncFlatMapFunction.class);
+    when(mockOp.getTransformFn()).thenReturn(txfmFn);
+
+    AsyncFlatmapOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> 
opImpl =
+        new AsyncFlatmapOperatorImpl<>(mockOp);
+
+    // ensure that close is not called yet
+    verify(txfmFn, times(0)).close();
+    opImpl.handleClose();
+    // ensure that close is called once inside handleClose()
+    verify(txfmFn, times(1)).close();
+  }
+}

Reply via email to