This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6685b28 SAMZA-2490: AsyncFlatmapOperatorImpl#handleClose should call
transformFn.close() (#1320)
6685b28 is described below
commit 6685b28b5e744ba102675c01fb135059446ab583
Author: Yixing Zhang <[email protected]>
AuthorDate: Thu Mar 19 10:52:43 2020 -0700
SAMZA-2490: AsyncFlatmapOperatorImpl#handleClose should call
transformFn.close() (#1320)
---
.../operators/impl/AsyncFlatmapOperatorImpl.java | 1 +
.../impl/TestAsyncFlatmapOperatorImpl.java | 75 ++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
index fa5e56a..bc55472 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/impl/AsyncFlatmapOperatorImpl.java
@@ -49,6 +49,7 @@ public class AsyncFlatmapOperatorImpl<M, RM> extends
OperatorImpl<M, RM> {
@Override
protected void handleClose() {
+ this.transformFn.close();
}
@Override
diff --git
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
new file mode 100644
index 0000000..9e8f755
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestAsyncFlatmapOperatorImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestAsyncFlatmapOperatorImpl {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAsyncFlatMapOperator() {
+ AsyncFlatMapOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>
mockOp = mock(AsyncFlatMapOperatorSpec.class);
+ AsyncFlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>
txfmFn = mock(AsyncFlatMapFunction.class);
+ when(mockOp.getTransformFn()).thenReturn(txfmFn);
+ AsyncFlatmapOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>
opImpl =
+ new AsyncFlatmapOperatorImpl<>(mockOp);
+ TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
+ Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
+ when(txfmFn.apply(inMsg)).thenReturn(CompletableFuture.supplyAsync(() ->
mockOutputs));
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ Collection<TestOutputMessageEnvelope> results = opImpl
+ .handleMessage(inMsg, mockCollector, mockCoordinator);
+ verify(txfmFn, times(1)).apply(inMsg);
+ assertEquals(results, mockOutputs);
+ }
+
+ @Test
+ public void testAsyncFlatMapOperatorClose() {
+ AsyncFlatMapOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>
mockOp = mock(AsyncFlatMapOperatorSpec.class);
+ AsyncFlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>
txfmFn = mock(AsyncFlatMapFunction.class);
+ when(mockOp.getTransformFn()).thenReturn(txfmFn);
+
+ AsyncFlatmapOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>
opImpl =
+ new AsyncFlatmapOperatorImpl<>(mockOp);
+
+ // ensure that close is not called yet
+ verify(txfmFn, times(0)).close();
+ opImpl.handleClose();
+ // ensure that close is called once inside handleClose()
+ verify(txfmFn, times(1)).close();
+ }
+}