[
https://issues.apache.org/jira/browse/FLINK-4078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336154#comment-15336154
]
ASF GitHub Bot commented on FLINK-4078:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2116#discussion_r67514718
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
---
@@ -334,6 +335,124 @@ public void
testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception
compareResultAsTuples(result, expected);
}
+ @Test
+ public void
testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner()
throws Exception {
+ /*
+ * CoGroup with multiple key fields, test working closure
cleaner for inner classes
+ */
+
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 =
CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 =
CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped =
ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer, Long,
Integer, String, Long>,
+ Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long>
getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+ return new Tuple2<Integer,
Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new
KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+
+ @Override
+ public Tuple2<Integer, Long>
getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer,
Long>(t.f0, t.f1);
+ }
+ }).
+ with(new CoGroupFunction<Tuple5<Integer, Long,
Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long,
String>>() {
+ @Override
+ public void
coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+
Iterable<Tuple3<Integer, Long, String>> second,
+
Collector<Tuple3<Integer, Long, String>> out)
+ {
+ List<String> strs = new
ArrayList<String>();
+
+ for (Tuple5<Integer, Long,
Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for(Tuple3<Integer, Long,
String> t : second) {
+ for(String s : strs) {
+ out.collect(new
Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ });
+
+ List<Tuple3<Integer, Long, String>> result =
coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test(expected = InvalidProgramException.class)
--- End diff --
Would make sense to test this in a more specific way, e.g. wrap in `try {
... } catch (InvalidProgramException e) { }` and check that the root cause of
`e` is `NotSerializableException`. Otherwise, a not respected closure cleaner
usage flag might be hidden by another exception.
> Use ClosureCleaner for CoGroup where
> ------------------------------------
>
> Key: FLINK-4078
> URL: https://issues.apache.org/jira/browse/FLINK-4078
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.0.3
> Reporter: Ufuk Celebi
> Assignee: Stefan Richter
> Fix For: 1.1.0
>
>
> When specifying a key selector in the where clause of a CoGroup, the closure
> cleaner is not used.
> {code}
> .coGroup(filteredIds)
> .where(new KeySelector<T, String>() {
> @Override
> public String getKey(T t) throws Exception {
> String s = (String) t.get(fieldName);
> return s != null ? s : UUID.randomUUID().toString();
> }
> })
> {code}
> The problem is that the KeySelector is an anonymous inner class and as such
> as a reference to the outer object. Normally, this would be rectified by the
> closure cleaner but the cleaner is not used in CoGroup.where().
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)