Repository: flink Updated Branches: refs/heads/master ff552b440 -> 243ef69bf
[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections Conclude OuterJoinOperatorBase#executeOnCollections with a call to FunctionUtils.closeFunction(function) in order to close rich user functions. This closes #3453 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/243ef69b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/243ef69b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/243ef69b Branch: refs/heads/master Commit: 243ef69bf5233998dd7f849721cfcb83669b663c Parents: ff552b4 Author: Greg Hogan <[email protected]> Authored: Wed Mar 1 15:55:48 2017 -0500 Committer: Greg Hogan <[email protected]> Committed: Thu Mar 2 10:38:00 2017 -0500 ---------------------------------------------------------------------- .../operators/base/OuterJoinOperatorBase.java | 3 +- .../base/OuterJoinOperatorBaseTest.java | 80 ++++++++++++++++---- .../graph/library/link_analysis/HITSTest.java | 2 +- 3 files changed, 69 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java index a47a612..003fdca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java @@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); FunctionUtils.openFunction(function, this.parameters); - List<OUT> result = new ArrayList<>(); Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig)); @@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector); } + FunctionUtils.closeFunction(function); + return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java index 683e164..9b6c684 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java @@ -19,34 +19,63 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; +import org.junit.Before; import org.junit.Test; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class OuterJoinOperatorBaseTest implements Serializable { - private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() { - @Override - public void join(String first, String second, Collector<String> out) throws Exception { - out.collect(String.valueOf(first) + ',' + String.valueOf(second)); - } - }; + private MockRichFlatJoinFunction joiner; + + private OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator; + + private ExecutionConfig executionConfig; + + private RuntimeContext runtimeContext; @SuppressWarnings({"rawtypes", "unchecked"}) - private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator = + @Before + public void setup() { + joiner = new MockRichFlatJoinFunction(); + + baseOperator = new OuterJoinOperatorBase(joiner, - new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + + executionConfig = new ExecutionConfig(); + + String taskName = "Test rich outer join function"; + TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0); + HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>(); + HashMap<String, Future<Path>> cpTasks = new HashMap<>(); + + runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()); + } @Test public void testFullOuterJoinWithoutMatchingPartners() throws Exception { @@ -132,18 +161,41 @@ public class OuterJoinOperatorBaseTest implements Serializable { baseOperator.setOuterJoinType(null); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); } private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception { - ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); executionConfig.enableObjectReuse(); - List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); + + assertTrue(joiner.opened.get()); + assertTrue(joiner.closed.get()); } -} \ No newline at end of file + private static class MockRichFlatJoinFunction extends RichFlatJoinFunction<String, String, String> { + final AtomicBoolean opened = new AtomicBoolean(false); + final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void open(Configuration parameters) throws Exception { + opened.compareAndSet(false, true); + assertEquals(0, getRuntimeContext().getIndexOfThisSubtask()); + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void close() throws Exception{ + closed.compareAndSet(false, true); + } + + @Override + public void join(String first, String second, Collector<String> out) throws Exception { + out.collect(String.valueOf(first) + ',' + String.valueOf(second)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/243ef69b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index 2e5cebe..b09f95c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -80,7 +80,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { DataSet<Result<LongValue>> hits = directedRMatGraph - .run(new HITS<LongValue, NullValue, NullValue>(0.000001)); + .run(new HITS<LongValue, NullValue, NullValue>(1)); Checksum checksum = new ChecksumHashCode<Result<LongValue>>() .run(hits)
