[ 
https://issues.apache.org/jira/browse/FLINK-4078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336154#comment-15336154
 ] 

ASF GitHub Bot commented on FLINK-4078:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2116#discussion_r67514718
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 ---
    @@ -334,6 +335,124 @@ public void 
testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception
                compareResultAsTuples(result, expected);
        }
     
    +   @Test
    +   public void 
testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() 
throws Exception {
    +           /*
    +            * CoGroup with multiple key fields, test working closure 
cleaner for inner classes
    +            */
    +
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
    +           DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
    +
    +           DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
    +                           where(new KeySelector<Tuple5<Integer, Long, 
Integer, String, Long>,
    +                                           Tuple2<Integer, Long>>() {
    +                                   @Override
    +                                   public Tuple2<Integer, Long> 
getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
    +                                           return new Tuple2<Integer, 
Long>(t.f0, t.f4);
    +                                   }
    +                           }).
    +                           equalTo(new 
KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
    +
    +                                   @Override
    +                                   public Tuple2<Integer, Long> 
getKey(Tuple3<Integer,Long,String> t) {
    +                                           return new Tuple2<Integer, 
Long>(t.f0, t.f1);
    +                                   }
    +                           }).
    +                           with(new CoGroupFunction<Tuple5<Integer, Long, 
Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>>() {
    +                                   @Override
    +                                   public void 
coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
    +                                                       
Iterable<Tuple3<Integer, Long, String>> second,
    +                                                       
Collector<Tuple3<Integer, Long, String>> out)
    +                                   {
    +                                           List<String> strs = new 
ArrayList<String>();
    +
    +                                           for (Tuple5<Integer, Long, 
Integer, String, Long> t : first) {
    +                                                   strs.add(t.f3);
    +                                           }
    +
    +                                           for(Tuple3<Integer, Long, 
String> t : second) {
    +                                                   for(String s : strs) {
    +                                                           out.collect(new 
Tuple3<Integer, Long, String>(t.f0, t.f1, s));
    +                                                   }
    +                                           }
    +                                   }
    +                           });
    +
    +           List<Tuple3<Integer, Long, String>> result = 
coGrouped.collect();
    +
    +           String expected = "1,1,Hallo\n" +
    +                           "2,2,Hallo Welt\n" +
    +                           "3,2,Hallo Welt wie gehts?\n" +
    +                           "3,2,ABC\n" +
    +                           "5,3,HIJ\n" +
    +                           "5,3,IJK\n";
    +
    +           compareResultAsTuples(result, expected);
    +   }
    +
    +   @Test(expected = InvalidProgramException.class)
    --- End diff --
    
    Would make sense to test this in a more specific way, e.g. wrap in `try { 
... } catch (InvalidProgramException e) { }` and check that the root cause of 
`e` is `NotSerializableException`. Otherwise, a not respected closure cleaner 
usage flag might be hidden by another exception. 


> Use ClosureCleaner for CoGroup where
> ------------------------------------
>
>                 Key: FLINK-4078
>                 URL: https://issues.apache.org/jira/browse/FLINK-4078
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.3
>            Reporter: Ufuk Celebi
>            Assignee: Stefan Richter
>             Fix For: 1.1.0
>
>
> When specifying a key selector in the where clause of a CoGroup, the closure 
> cleaner is not used.
> {code}
> .coGroup(filteredIds)
>                 .where(new KeySelector<T, String>() {
>                     @Override
>                     public String getKey(T t) throws Exception {
>                         String s = (String) t.get(fieldName);
>                         return s != null ? s : UUID.randomUUID().toString();
>                     }
>                 })
> {code}
> The problem is that the KeySelector is an anonymous inner class and as such 
> as a reference to the outer object. Normally, this would be rectified by the 
> closure cleaner but the cleaner is not used in CoGroup.where().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to