[
https://issues.apache.org/jira/browse/FLINK-26427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499620#comment-17499620
]
JOHN ERVINE commented on FLINK-26427:
-------------------------------------
This issue can be closed. The new FileSink API solved the problem.
> Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3
> (Race Condition)
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-26427
> URL: https://issues.apache.org/jira/browse/FLINK-26427
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.1
> Reporter: JOHN ERVINE
> Priority: Blocker
>
> I'm experiencing some odd behaviour when writing ORC files to S3 using flinks
> Streaming File Sink.
>
>
> {code:java}
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
> CheckpointingMode.EXACTLY_ONCE);
> env.getConfig().enableObjectReuse();
> Properties writerProperties = new Properties();
> writerProperties.put("orc.compress", "SNAPPY");
> //Order Book Sink
> StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
> .forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
> new OrcBulkWriterFactory<>(new
> OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new
> Configuration()))
> .withBucketAssigner(new OrderBookBucketingAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();{code}
>
> I noticed when running queries during ingest of the data, that my row counts
> were being decremented as the job progressed. I've had a look at S3 and I can
> seem multiple versions of the same part file. The example below shows part
> file 15-7 has two versions. The first file is 20.7mb and the last file that's
> committed is smaller at 5.1mb. In most cases the current file is normally
> larger but in my instance there are a few examples in the screenshot below
> where this is not the case.
>
> !https://i.stack.imgur.com/soU4b.png|width=2173,height=603!
>
> This looks like a typical race condition or failure to upload commits to S3
> successfully because the log below shows two commits for the same file very
> close together. The last commit is at 20:44 but the last modified date in S3
> is at 20:43. I don't see any logs indicating a failure to commit. This is
> currently a blocker for us.
>
> {code:java}
> 2022-02-28T20:44:03.526+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
> APPNAME=${sys:AppName} S3Committer:64 - Committing
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
> with MPU ID
> vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--
> 2022-02-28T20:44:03.224+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
> APPNAME=${sys:AppName} S3Committer:64 - Committing
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
> with MPU ID
> jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
>
> {{ }}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)