Repository: flink Updated Branches: refs/heads/release-1.1 ba5aa10b9 -> 01703e60e
[FLINK-5945] [core] Close function in OuterJoinOperatorBase#executeOnCollections Conclude OuterJoinOperatorBase#executeOnCollections with a call to FunctionUtils.closeFunction(function) in order to close rich user functions. This closes #3453 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01703e60 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01703e60 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01703e60 Branch: refs/heads/release-1.1 Commit: 01703e60e0b583d6d32c2cba395f6199c5773c5e Parents: ba5aa10 Author: Greg Hogan <[email protected]> Authored: Wed Mar 1 15:55:48 2017 -0500 Committer: Greg Hogan <[email protected]> Committed: Thu Mar 2 09:42:22 2017 -0500 ---------------------------------------------------------------------- .../operators/base/OuterJoinOperatorBase.java | 3 +- .../base/OuterJoinOperatorBaseTest.java | 88 ++++++++++++++++---- .../graph/library/link_analysis/HITSTest.java | 2 +- 3 files changed, 73 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java index a47a612..003fdca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java @@ -103,7 +103,6 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); FunctionUtils.openFunction(function, this.parameters); - List<OUT> result = new ArrayList<>(); Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig)); @@ -113,6 +112,8 @@ public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector); } + FunctionUtils.closeFunction(function); + return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java index 69159f2..143a416 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java @@ -18,34 +18,63 @@ package org.apache.flink.api.common.operators.base; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; +import org.junit.Before; import org.junit.Test; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class OuterJoinOperatorBaseTest implements Serializable { - private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() { - @Override - public void join(String first, String second, Collector<String> out) throws Exception { - out.collect(String.valueOf(first) + ',' + String.valueOf(second)); - } - }; + private MockRichFlatJoinFunction joiner; + + private OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator; + + private ExecutionConfig executionConfig; + + private RuntimeContext runtimeContext; @SuppressWarnings({"rawtypes", "unchecked"}) - private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator = + @Before + public void setup() { + joiner = new MockRichFlatJoinFunction(); + + baseOperator = new OuterJoinOperatorBase(joiner, - new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + + executionConfig = new ExecutionConfig(); + + String taskName = "Test rich outer join function"; + TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); + HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<>(); + HashMap<String, Future<Path>> cpTasks = new HashMap<>(); + + runtimeContext = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()); + } @Test public void testFullOuterJoinWithoutMatchingPartners() throws Exception { @@ -131,18 +160,41 @@ public class OuterJoinOperatorBaseTest implements Serializable { baseOperator.setOuterJoinType(null); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); } private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception { - ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); executionConfig.enableObjectReuse(); - List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, runtimeContext, executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); + + assertTrue(joiner.opened.get()); + assertTrue(joiner.closed.get()); } -} \ No newline at end of file + private static class MockRichFlatJoinFunction extends RichFlatJoinFunction<String, String, String> { + final AtomicBoolean opened = new AtomicBoolean(false); + final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void open(Configuration parameters) throws Exception { + opened.compareAndSet(false, true); + assertEquals(0, getRuntimeContext().getIndexOfThisSubtask()); + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void close() throws Exception{ + closed.compareAndSet(false, true); + } + + @Override + public void join(String first, String second, Collector<String> out) throws Exception { + out.collect(String.valueOf(first) + ',' + String.valueOf(second)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/01703e60/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index 1551d84..590fc0e 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -80,7 +80,7 @@ extends AsmTestBase { public void testWithRMatGraph() throws Exception { ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new HITS<LongValue, NullValue, NullValue>(0.000001))); + .run(new HITS<LongValue, NullValue, NullValue>(1))); assertEquals(902, checksum.getCount()); assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());
