[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709320#comment-14709320
 ] 

ASF GitHub Bot commented on FLINK-2536:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37752742
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
    @@ -73,13 +85,49 @@ public void intializeConnection() {
         *                      The incoming data
         */
        @Override
    -   public void invoke(IN value) {
    +   public void invoke(IN value) throws Exception {
                byte[] msg = schema.serialize(value);
                try {
                        dataOutputStream.write(msg);
                } catch (IOException e) {
    -                   throw new RuntimeException("Cannot send message " + 
value.toString() +
    -                                   " to socket server at " + hostName + 
":" + port, e);
    +                   Log.error("Cannot send message " + value.toString() +
    +                                   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
    +                                   ". Trying to reconnect.");
    +                   retries = 0;
    +                   boolean success = false;
    +                   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
    +                           try {
    +
    +                                   if (dataOutputStream != null) {
    +                                           dataOutputStream.close();
    +                                   }
    +
    +                                   if (client != null && 
!client.isClosed()) {
    +                                           client.close();
    +                                   }
    +
    +                                   if (!retryForever){
    +                                           retries++;
    +                                   }
    +
    +                                   client = new Socket(hostName, port);
    +                                   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
    +                                   dataOutputStream.write(msg);
    +                                   success = true;
    +
    +                           } catch(IOException ee) {
    +                                   Log.error("Reconnect to socket server 
and send message failed. Caused by " +
    +                                                           ee.toString() + 
". Retry time(s):" + retries);
    +                                   synchronized (this) {
    +                                           
this.wait(CONNECTION_RETRY_SLEEP);
    --- End diff --
    
    This can throw an `InterruptedException`. Should be treated by checking the 
termination criterion for example.


> Add a retry for SocketClientSink
> --------------------------------
>
>                 Key: FLINK-2536
>                 URL: https://issues.apache.org/jira/browse/FLINK-2536
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to