[ 
https://issues.apache.org/jira/browse/FLINK-21578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300863#comment-17300863
 ] 

Yun Gao commented on FLINK-21578:
---------------------------------

Hi [~kezhuw], sorry I might not fully got the issue, are you thinking that 
booting resources in `createXX` has problems ?

For example, I think the pattern would be like
{code:java}
class XXSink implements Sink {
   
   ...
   
   XXCommitter createCommiter() {
        ResourceA a = new ResourceA();
        ResourceB b = new ResourceB();
        return new XXCommitter(a, b);
   }
}

class XXCommitter implements Committer {
    private final ResourceA a;
    private final ResourceB b;     
    
    public XXCommitter(ResourceA a, ResourceB b) {
        ...
    }

    void commit(...) {
        ...
    }
    
    @Override
    void close() {
        a.close();
        b.close();
    }
}

{code}

> Closeable Sink Committer/GlobalCommitter were created to function in onestep 
> during job graph composition
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21578
>                 URL: https://issues.apache.org/jira/browse/FLINK-21578
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.13.0
>            Reporter: Kezhu Wang
>            Priority: Major
>
> Normally, functions/operators are created in job graph composition phase for 
> serialization and transmission. Them are "opened" in flink cluster to 
> function. This two steps procedure succeed in that there will be no 
> resource-cleanup requirement in job graph composition phase.
> While {{Committer}} and {{GlobalCommitter}} has no such "open" operatin but 
> they were created in job graph composition phase.
> Following are fixes I could image if we converge to "this is problematic".
>  # Add {{open}} or similar method for these two classes.
>  # Add {{hasCommitter}}, {{hasGlobalCommitter}} to {{Sink}} and make 
> {{createCommitter}} and others not optional(enforce this in runtime).
> Personally, I am a bit preferring second approach for possible less code path 
> touching in job graph composition phase. But first approach has advantage 
> that it could be an no breaking change.
> There might be other approaches though.
> cc [~guoweima] [~gaoyunhaii]  [~aljoscha]  [~kkl0u]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to