Repository: flink
Updated Branches:
  refs/heads/master ff552b440 -> 243ef69bf


[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections

Conclude OuterJoinOperatorBase#executeOnCollections with a call to
FunctionUtils.closeFunction(function) in order to close rich user
functions.

This closes #3453


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/243ef69b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/243ef69b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/243ef69b

Branch: refs/heads/master
Commit: 243ef69bf5233998dd7f849721cfcb83669b663c
Parents: ff552b4
Author: Greg Hogan <[email protected]>
Authored: Wed Mar 1 15:55:48 2017 -0500
Committer: Greg Hogan <[email protected]>
Committed: Thu Mar 2 10:38:00 2017 -0500

----------------------------------------------------------------------
 .../operators/base/OuterJoinOperatorBase.java   |  3 +-
 .../base/OuterJoinOperatorBaseTest.java         | 80 ++++++++++++++++----
 .../graph/library/link_analysis/HITSTest.java   |  2 +-
 3 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index a47a612..003fdca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT 
extends FlatJoinFunction<IN
                FunctionUtils.setFunctionRuntimeContext(function, 
runtimeContext);
                FunctionUtils.openFunction(function, this.parameters);
 
-
                List<OUT> result = new ArrayList<>();
                Collector<OUT> collector = new CopyingListCollector<>(result, 
outInformation.createSerializer(executionConfig));
 
@@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT 
extends FlatJoinFunction<IN
                        function.join(left == null ? null : 
leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), 
collector);
                }
 
+               FunctionUtils.closeFunction(function);
+
                return result;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 683e164..9b6c684 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -19,34 +19,63 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class OuterJoinOperatorBaseTest implements Serializable {
 
-       private final FlatJoinFunction<String, String, String> joiner = new 
FlatJoinFunction<String, String, String>() {
-               @Override
-               public void join(String first, String second, Collector<String> 
out) throws Exception {
-                       out.collect(String.valueOf(first) + ',' + 
String.valueOf(second));
-               }
-       };
+       private MockRichFlatJoinFunction joiner;
+
+       private OuterJoinOperatorBase<String, String, String, 
FlatJoinFunction<String, String, String>> baseOperator;
+
+       private ExecutionConfig executionConfig;
+
+       private RuntimeContext runtimeContext;
 
        @SuppressWarnings({"rawtypes", "unchecked"})
-       private final OuterJoinOperatorBase<String, String, String, 
FlatJoinFunction<String, String, String>> baseOperator =
+       @Before
+       public void setup() {
+               joiner = new MockRichFlatJoinFunction();
+
+               baseOperator =
                        new OuterJoinOperatorBase(joiner,
-                                       new 
BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO,
-                                                       
BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+                               new 
BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO,
+                                       BasicTypeInfo.STRING_TYPE_INFO), new 
int[0], new int[0], "TestJoiner", null);
+
+               executionConfig = new ExecutionConfig();
+
+               String taskName = "Test rich outer join function";
+               TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
+               HashMap<String, Accumulator<?, ?>> accumulatorMap = new 
HashMap<>();
+               HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+               runtimeContext = new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks,
+                       accumulatorMap, new UnregisteredMetricsGroup());
+       }
 
        @Test
        public void testFullOuterJoinWithoutMatchingPartners() throws Exception 
{
@@ -132,18 +161,41 @@ public class OuterJoinOperatorBaseTest implements 
Serializable {
                baseOperator.setOuterJoinType(null);
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.disableObjectReuse();
-               baseOperator.executeOnCollections(leftInput, rightInput, null, 
executionConfig);
+               baseOperator.executeOnCollections(leftInput, rightInput, 
runtimeContext, executionConfig);
        }
 
        private void testOuterJoin(List<String> leftInput, List<String> 
rightInput, List<String> expected) throws Exception {
-               ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.disableObjectReuse();
-               List<String> resultSafe = 
baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+               List<String> resultSafe = 
baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, 
executionConfig);
                executionConfig.enableObjectReuse();
-               List<String> resultRegular = 
baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+               List<String> resultRegular = 
baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, 
executionConfig);
 
                assertEquals(expected, resultSafe);
                assertEquals(expected, resultRegular);
+
+               assertTrue(joiner.opened.get());
+               assertTrue(joiner.closed.get());
        }
 
-}
\ No newline at end of file
+       private static class MockRichFlatJoinFunction extends 
RichFlatJoinFunction<String, String, String> {
+               final AtomicBoolean opened = new AtomicBoolean(false);
+               final AtomicBoolean closed = new AtomicBoolean(false);
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       opened.compareAndSet(false, true);
+                       assertEquals(0, 
getRuntimeContext().getIndexOfThisSubtask());
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               }
+
+               @Override
+               public void close() throws Exception{
+                       closed.compareAndSet(false, true);
+               }
+
+               @Override
+               public void join(String first, String second, Collector<String> 
out) throws Exception {
+                       out.collect(String.valueOf(first) + ',' + 
String.valueOf(second));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 2e5cebe..b09f95c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -80,7 +80,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Result<LongValue>> hits = directedRMatGraph
-                       .run(new HITS<LongValue, NullValue, 
NullValue>(0.000001));
+                       .run(new HITS<LongValue, NullValue, NullValue>(1));
 
                Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
                        .run(hits)

Reply via email to