Samrat002 commented on PR #23725:
URL: https://github.com/apache/flink/pull/23725#issuecomment-1873212013

   Thank you, @MartijnVisser, for diving into the PR.
   
   Before the merge of https://github.com/apache/flink/pull/21458, this issue 
exists from long time.
   
   ## How to reproduce this issue
   
   1. Create a cluster and run Flink session.
   2. Ensure S3 is accessible from the cluster, and all keychains are 
configured properly.
   3. For simplicity, open a SQL client and execute the following commands:
   
   ```sql
   CREATE TABLE test_table (
      id INT,
      data STRING
    ) WITH (
      'connector' = 'filesystem',
      'path' = 's3://<bucket_name>/<path_to_table>/',
      'format' = 'csv'
    );
   
   INSERT INTO test_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
(3, 'Hi'), (4, 'Hello'), (5, 'World'), (6, 'ADD'), (7, 'LINE'));
   ```
   
   Note: Ensure updating `<bucket_name>` and `<path_to_table>` before executing 
the above 2 queries.
   
   This issue occurs only when the **`csv`** format is used to write data into 
S3. Other formats like `json` or any other delimiter don't cause any errors.
   
   ## Why this error only with CSV
   In my code analysis, when the `csv` format is used in any connector, it 
calls a class `CSVBulkWriter`, which implements the `BulkWriter` interface.
   
   Data is written to S3 when the `flush` method is executed. `CSVBulkWriter` 
contains an instance `stream` of type `FsDataOutputStream`. When S3 is invoked, 
it creates an object of `S3RecoverableFsDataOutputStream` 
(S3RecoverableFsDataOutputStream extends `RecoverableFsDataOutputStream`, and 
`RecoverableFsDataOutputStream` extends `FSDataOutputStream`). 
`S3RecoverableFsDataOutputStream` creates a stream of type 
`RefCountedBufferingFileStream`. The `sync` method call delegates to this 
object, and this is not a supported operation in 
`RefCountedBufferingFileStream`.
   
   ```
   Note: ensure updating <bucket_name> and <path_to_table> before executing the 
above 2 queries
   
   This issue is caused only when **`csv`** format is used to write data into 
s3. Other formats like `json` , or any other delimiter doesnt causes any error. 
   
   ## Why this error only with CSV
   As per my code analysis and understanding when format `csv` is used in any 
connector ,it calls a class `CSVBulkWriter` which implements `BulkWriter` 
interface. 
   
   Data is getting written in s3 when `flush` method is executed . 
`CSVBulkWriter` contains an instane `stream` of type `FsDataOutputStream` and 
when s3 is invoked then it creates an object of 
`S3RecoverableFsDataOutputStream`  (S3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream and RecoverableFsDataOutputStream extends 
FSDataOutputStream ) this calls a method `sync` 
   
   
   Currently i am looking into `test_file_sink.sh s3 StreamingFileSink` to 
understand what can be added more to improve the coverage 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to