[
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709237#comment-14709237
]
ASF GitHub Bot commented on FLINK-2536:
---------------------------------------
Github user HuangWHWHW commented on a diff in the pull request:
https://github.com/apache/flink/pull/1030#discussion_r37748350
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link
org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+ final AtomicReference<Throwable> error = new
AtomicReference<Throwable>();
+ private final String host = "127.0.0.1";
+ private int port;
+ private String value;
+
+ public Thread t;
+
+ public SocketClientSinkTest() {
+ }
+
+ @Test
+ public void testSocketSink() throws Exception{
+ value = "";
+ ServerSocket server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ t = Thread.currentThread();
+ SerializationSchema<String, byte[]>
simpleSchema = new SerializationSchema<String, byte[]>() {
+ @Override
+ public byte[] serialize(String element)
{
+ return element.getBytes();
+ }
+ };
+
+ try {
+ SocketClientSink<String> simpleSink =
new SocketClientSink<String>(host, port, simpleSchema, 0);
+ simpleSink.open(new Configuration());
+
simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ } catch (Exception e){
+ error.set(e);
+ }
+ }
+ }).start();
+
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+ .getInputStream()));
+ value = rdr.readLine();
+
+ t.join();
+ server.close();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testSocketSinkInvoke", value);
+ }
+
+ @Test
+ public void testSocketSinkNoRetry() throws Exception{
+ ServerSocket server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ t = Thread.currentThread();
+ SerializationSchema<String, byte[]>
simpleSchema = new SerializationSchema<String, byte[]>() {
+ @Override
+ public byte[] serialize(String element)
{
+ return element.getBytes();
+ }
+ };
+
+ try {
+ SocketClientSink<String> simpleSink =
new SocketClientSink<String>(host, port, simpleSchema, 0);
+ simpleSink.open(new Configuration());
+
+ synchronized (t) {
+ //wating for server to close
+ t.wait();
+ }
+
+ //firstly send a fin to cancel the
client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+
simpleSink.invoke("testSocketSinkInvoke");
+
+ //socket is closed then test "retry"
+
simpleSink.invoke("testSocketSinkInvoke");
--- End diff --
Due to the first time, when server closed, server is waiting for a fin
massage(state:FIN_WAIT_2).
First calling the invoke, server will really close.
Then calling second times, sink will retry as the server is closed.
> Add a retry for SocketClientSink
> --------------------------------
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Huang Wei
> Fix For: 0.10
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)