Hi Samrat,
I copied the thread as I wasn't subscribed previously. I used the core ip address or core host names and they are identical 1. Flink 1.5.2 2. I don't get any exceptions, the logs at Trace level, no relevant errors. I at5tched the jobmanager logs, the error log is empty 3. The source generates records and succeeds to write records. The destination doesn't read any records. The socket try to connect to the private dns name ip-10-0-102-186.ec2.internal or to 10.0.102.186 The reader IPs enumeration are: 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 172.18.0.1 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 172.17.0.1 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 10.0.102.186 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 0:0:0:0:0:0:0:1%lo 2023-02-21 10:17:53,170 INFO org.example.WindowWordCount [] - Address 2 127.0.0.1 The writer IPs enumeration are: 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 172.18.0.1 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 172.17.0.1 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 10.0.102.186 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 0:0:0:0:0:0:0:1%lo 2023-02-21 10:17:58,809 INFO org.example.Main [] - Address 2 127.0.0.1 Hi Italy, `writeToSocket` supports passing the hostname , port and schema as arguments. I think you can pass the hostname of one of the core nodes in the flink job you intend to view the message. Have you tried that ? Can you add more details , 1. Which flink version are you using? 2. If you are getting some exception, can you add error logs. 3. Have you verified if your source is generating new records when you are trying to view records in socket. Bests, Samrat Hi, I build a streaming pipeline and there are two jobs that I wish to connect with socket(later we plan to have kafka instead). The jobs submitted in AWS EMR cluster with this configuration { "Classification": "flink-conf", "Properties": { "JAVA_HOME": "/usr/lib/jvm/java-11-openjdk", "env.java.home": "/usr/lib/jvm/java-11-openjdk", "high-availability": "zookeeper", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "java.home": "/usr/lib/jvm/java-11-openjdk", "taskmanager.data.port": "35001", "taskmanager.numberOfTaskSlots": "2", "yarn.application-attempts": "10" } }, I probably miss how to define the host when I use writeToSocket and socketToStream from my jobs. Is it configuration or one of the primary/core node ips. I have tried many options and non of the messages went through. Any help would be appreciated. Itay Sali
<<attachment: jobmanager-logs-socket.zip>>
<<attachment: jobmanager-logs-socket.zip>>
<<attachment: jobmanager-logs-socket.zip>>
<<attachment: jobmanager-logs-socket.zip>>