[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725223#comment-14725223
 ] 

ASF GitHub Bot commented on FLINK-2536:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38408757
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
         *                      The incoming data
         */
        @Override
    -   public void invoke(IN value) {
    +   public void invoke(IN value) throws Exception {
                byte[] msg = schema.serialize(value);
                try {
                        dataOutputStream.write(msg);
                } catch (IOException e) {
    -                   throw new RuntimeException("Cannot send message " + 
value.toString() +
    -                                   " to socket server at " + hostName + 
":" + port, e);
    +                   LOG.error("Cannot send message " + value.toString() +
    +                                   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
    +                                   ". Trying to reconnect.");
    +                   retries = 0;
    +                   boolean success = false;
    +                   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
    +                           try {
    +
    +                                   if (dataOutputStream != null) {
    +                                           dataOutputStream.close();
    +                                   }
    +
    +                                   if (client != null && 
!client.isClosed()) {
    +                                           client.close();
    +                                   }
    +
    +                                   retries++;
    +
    +                                   client = new Socket(hostName, port);
    +                                   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
    +                                   dataOutputStream.write(msg);
    +                                   success = true;
    +
    +                           } catch(IOException ee) {
    +                                   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
    +                                                   ee.toString() + ". 
Retry time(s):" + retries);
    +
    +                                   if (lock == null) {
    +                                           lock = new Object();
    +                                   }
    +
    +                                   try {
    +                                           synchronized (lock) {
    +                                                   
lock.wait(CONNECTION_RETRY_SLEEP);
    +                                           }
    +                                   } catch(InterruptedException eee) {
    +                                           LOG.error(eee.toString());
    --- End diff --
    
    I think there is no need to log this. Interrupting the thread usually means 
that it has been shut down.
    
    If you want to log exceptions, log them via ("message", exeption), here 
`LOG.error("Reconnect delay interrupted", e);`


> Add a retry for SocketClientSink
> --------------------------------
>
>                 Key: FLINK-2536
>                 URL: https://issues.apache.org/jira/browse/FLINK-2536
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to