Hi Samrat,

I copied the thread as I wasn't subscribed previously.

I used the core ip address or core host names and they are identical

  1.  Flink 1.5.2
  2.  I don't get any exceptions, the logs at Trace level, no relevant errors. 
I at5tched the jobmanager logs, the error log is empty
  3.  The source generates records and succeeds to write records. The 
destination doesn't read any records.

The socket try to connect to the private dns name  ip-10-0-102-186.ec2.internal 
or to 10.0.102.186

The reader IPs enumeration are:
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 172.18.0.1
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 172.17.0.1
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 10.0.102.186
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 0:0:0:0:0:0:0:1%lo
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                       
           [] - Address 2 127.0.0.1
      

The writer IPs enumeration are:
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 172.18.0.1
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 172.17.0.1
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 10.0.102.186
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 0:0:0:0:0:0:0:1%lo
2023-02-21 10:17:58,809 INFO  org.example.Main                                  
           [] - Address 2 127.0.0.1



Hi Italy, `writeToSocket` supports passing the hostname , port and schema as 
arguments. I think you can pass the hostname of one of the core nodes in the 
flink job you intend to view the message. Have you tried that ? Can you add 
more details , 1. Which flink version are you using? 2. If you are getting some 
exception, can you add error logs. 3. Have you verified if your source is 
generating new records when you are trying to view records in socket. Bests, 
Samrat



Hi, I build a streaming pipeline and there are two jobs that I wish to connect 
with socket(later we plan to have kafka instead). The jobs submitted in AWS EMR 
cluster with this configuration { "Classification": "flink-conf", "Properties": 
{ "JAVA_HOME": "/usr/lib/jvm/java-11-openjdk", "env.java.home": 
"/usr/lib/jvm/java-11-openjdk", "high-availability": "zookeeper", 
"high-availability.storageDir": "hdfs:///user/flink/recovery", 
"high-availability.zookeeper.path.root": "/flink", 
"high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "java.home": 
"/usr/lib/jvm/java-11-openjdk", "taskmanager.data.port": "35001", 
"taskmanager.numberOfTaskSlots": "2", "yarn.application-attempts": "10" } }, I 
probably miss how to define the host when I use writeToSocket and 
socketToStream from my jobs. Is it configuration or one of the primary/core 
node ips. I have tried many options and non of the messages went through. Any 
help would be appreciated.






Itay Sali

<<attachment: jobmanager-logs-socket.zip>>

<<attachment: jobmanager-logs-socket.zip>>

<<attachment: jobmanager-logs-socket.zip>>

<<attachment: jobmanager-logs-socket.zip>>

Reply via email to