Samrat002 opened a new pull request, #23718:
URL: https://github.com/apache/flink/pull/23718

   
   
   ## What is the purpose of the change
   
   Fix the error due to IOException: Stream closed
   
   Calling persist() in case of filesystem is not syncable 
   
   Persist() behaves in same manner as sync()
   Reference:  
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.html#persist--
   
   
   
   ## Verifying this change
   
   SQL Client Logs 
   ```
   ider [] - Connecting to ResourceManager at 
ip-172-xx-xx-xxx.us-west-2.compute.internal/172.xx.xx.xxx:8032
   2023-11-15 08:34:34,824 INFO  org.apache.hadoop.yarn.client.AHSProxy         
              [] - Connecting to Application History server at 
ip-172-xx-xx-xxx.us-west-2.compute.internal/172.xx.xx.xxx:10200
   2023-11-15 08:34:34,824 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - No path for the flink jar passed. Using the location of 
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2023-11-15 08:34:34,824 WARN  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR 
environment variable is set.The Flink YARN Client needs one of these to be set 
to properly load the Hadoop configuration for accessing YARN.
   2023-11-15 08:34:34,837 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - FoundINSERT INTO dummy_table SELECT * FROM (VALUES (1, 
'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'Wor1'.
   ld'), (4, 'ADD'), (5, 'LINE'));
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] SQL update statement has been successfully submitted to the cluster:
   Job ID: c190ea1d8613c4c730513a7221b5a3a4
   
   
   Flink SQL> select * from dummy_table;2023-11-15 08:35:41,397 INFO  
org.apache.hadoop.yarn.client.DefaultNoHARMFailoverProxyProvider [] - 
Connecting to ResourceManager at 
ip-172-xx-xx-xxx.us-west-2.compute.internal/172.xx.xx.xxx:8032
   2023-11-15 08:35:41,398 INFO  org.apache.hadoop.yarn.client.AHSProxy         
              [] - Connecting to Application History server at 
ip-172-xx-xx-xxx.us-west-2.compute.internal/172.xx.xx.xxx:10200
   2023-11-15 08:35:41,398 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - No path for the flink jar passed. Using the location of 
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2023-11-15 08:35:41,399 WARN  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR 
environment variable is set.The Flink YARN Client needs one of these to be set 
to properly load the Hadoop configuration for accessing YARN.
   2023-11-15 08:35:41,408 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Found Web Interface 
ip-172-xx-xx-xxx.us-west-2.compute.internal:43691 of application 
'application_1700033066610_0001'.
   
   [INFO] Result retrieval cancelled.
   
   Flink SQL> exit;
   [INFO] Exiting Flink SQL CLI Client...
   
   Shutting down the session...
   done.
   [hadoop@ip-172-xx-xx-xxx bin]$ aws s3 ls s3://prabhuflinks3/dummy_table/
   2023-11-15 07:11:01         55 
_part-43aaa2ae-e726-4012-9df9-b83e385de83d-0-0_tmp_91a055f5-5f9f-45b8-99c0-6f4ba0dfa8dd
   2023-11-15 07:14:04         55 
_part-8a36d67c-1593-49b8-965c-2c5063e46a9e-0-0_tmp_e2b85c2b-bec3-4d9c-a574-433888573752
   2023-11-15 08:34:38         55 part-2d6489e9-569a-4a90-bae1-fe63991be6c4-0-0
   2023-11-15 07:11:00         55 part-43aaa2ae-e726-4012-9df9-b83e385de83d-0-0
   2023-11-15 07:14:03         55 part-8a36d67c-1593-49b8-965c-2c5063e46a9e-0-0
   [hadoop@ip-172-xx-xx-xxx bin]$
   ```
   
   reading data from s3 path 
   ![Screenshot 2023-11-15 at 2 06 07 
PM](https://github.com/apache/flink/assets/40290566/4cb59e0c-4732-464a-bde8-6b429daccde7)
   
   Command used to verify changes are working properly 
   
   ```
   Flink SQL> create table dummy_table (
   >   id int,
   >   data string
   > ) with (
   >   'connector' = 'filesystem',
   >   'path' = 's3://prabhuflinks3/dummy_table/',
   >   'format' = 'csv'
   > );
   [INFO] Execute statement succeed.
   
   Flink SQL> INSERT INTO dummy_table_1 SELECT * FROM (VALUES (1, 'Hello 
World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 
'LINE'));
   ```
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
     - The S3 file system connector: (yes / no / don't know) yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) N/A
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to