[
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14706434#comment-14706434
]
ASF GitHub Bot commented on FLINK-2536:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1030#discussion_r37617867
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
---
@@ -73,13 +84,44 @@ public void intializeConnection() {
* The incoming data
*/
@Override
- public void invoke(IN value) {
+ public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
- throw new RuntimeException("Cannot send message " +
value.toString() +
- " to socket server at " + hostName +
":" + port, e);
+ retries = 0;
+ boolean success = false;
+ while ((retries < maxRetry || retryForever) && !success
&& isRunning){
+ try {
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null &&
!client.isClosed()) {
+ client.close();
+ }
+
+ if (!retryForever){
+ retries++;
+ }
+
+ client = new Socket(hostName, port);
+ dataOutputStream = new
DataOutputStream(client.getOutputStream());
+ dataOutputStream.write(msg);
+ success = true;
+
+ }catch(Exception ee){
--- End diff --
Well, it actually depends on the exceptions for which we want to restart.
If I'm not mistaken, then `new Socket()` and `dataOutputStream.write` only
throw `IOExceptions`. Thus, we should change it.
> Add a retry for SocketClientSink
> --------------------------------
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Huang Wei
> Fix For: 0.10
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)