[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736489#comment-14736489
 ] 

ASF GitHub Bot commented on FLINK-2536:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r39020317
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
         *                      The incoming data
         */
        @Override
    -   public void invoke(IN value) {
    +   public void invoke(IN value) throws Exception {
                byte[] msg = schema.serialize(value);
                try {
                        dataOutputStream.write(msg);
                } catch (IOException e) {
    -                   throw new RuntimeException("Cannot send message " + 
value.toString() +
    -                                   " to socket server at " + hostName + 
":" + port, e);
    +                   LOG.error("Cannot send message " + value.toString() +
    +                                   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
    +                                   ". Trying to reconnect.");
    +                   retries = 0;
    +                   boolean success = false;
    +                   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
    +                           try {
    +
    +                                   if (dataOutputStream != null) {
    +                                           dataOutputStream.close();
    +                                   }
    +
    +                                   if (client != null && 
!client.isClosed()) {
    +                                           client.close();
    +                                   }
    +
    +                                   retries++;
    +
    +                                   client = new Socket(hostName, port);
    +                                   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
    +                                   dataOutputStream.write(msg);
    +                                   success = true;
    +
    +                           } catch(IOException ee) {
    +                                   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
    +                                                   ee.toString() + ". 
Retry time(s):" + retries);
    +
    +                                   if (lock == null) {
    --- End diff --
    
    I see, that is good. For that to be safe, though, the lock should be 
initialized with the class.
    
    In general, it is good practice to only use final references as locks. If 
the class needs to be serializable, use the `SerializableObject`.


> Add a retry for SocketClientSink
> --------------------------------
>
>                 Key: FLINK-2536
>                 URL: https://issues.apache.org/jira/browse/FLINK-2536
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to