[
https://issues.apache.org/jira/browse/FLINK-26427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JOHN ERVINE updated FLINK-26427:
--------------------------------
Description:
I'm experiencing some odd behaviour when writing ORC files to S3 using flinks
Streaming File Sink.
{code:java}
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
CheckpointingMode.EXACTLY_ONCE);
env.getConfig().enableObjectReuse();
Properties writerProperties = new Properties();
writerProperties.put("orc.compress", "SNAPPY");
//Order Book Sink
StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
.forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
new OrcBulkWriterFactory<>(new
OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new
Configuration()))
.withBucketAssigner(new OrderBookBucketingAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();{code}
I noticed when running queries during ingest of the data, that my row counts
were being decremented as the job progressed. I've had a look at S3 and I can
seem multiple versions of the same part file. The example below shows part file
15-7 has two versions. The first file is 20.7mb and the last file that's
committed is smaller at 5.1mb. In most cases the current file is normally
larger but in my instance there are a few examples in the screenshot below
where this is not the case.
!https://i.stack.imgur.com/soU4b.png|width=2173,height=603!
This looks like a typical race condition or failure to upload commits to S3
successfully because the log below shows two commits for the same file very
close together. The last commit is at 20:44 but the last modified date in S3 is
at 20:43. I don't see any logs indicating a failure to commit. This is
currently a blocker for us.
{code:java}
2022-02-28T20:44:03.526+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
APPNAME=${sys:AppName} S3Committer:64 - Committing
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
with MPU ID
vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--
2022-02-28T20:44:03.224+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
APPNAME=${sys:AppName} S3Committer:64 - Committing
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
with MPU ID
jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
{{ }}
was:
I'm experiencing some odd behaviour when writing ORC files to S3 using flinks
Streaming File Sink.
{code:java}
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
CheckpointingMode.EXACTLY_ONCE);
env.getConfig().enableObjectReuse();
Properties writerProperties = new Properties();
writerProperties.put("orc.compress", "SNAPPY");
//Order Book Sink
StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
.forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
new OrcBulkWriterFactory<>(new
OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new
Configuration()))
.withBucketAssigner(new OrderBookBucketingAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();{code}
I noticed when running queries during ingest of the data, that my row counts
were being decremented as the job progressed. I've had a look at S3 and I can
seem multiple versions of the same part file. The example below shows part file
15-7 has two versions. The first file is 20.7mb and the last file that's
committed is smaller at 5.1mb. In most cases the current file is normally
larger but in my instance there are a few examples in the screenshot below
where this is not the case.
!https://i.stack.imgur.com/soU4b.png|width=2173,height=603!
This looks like a typical race condition or failure to upload commits to S3
successfully because the log below shows two commits for the same file very
close together. The last commit is at 20:44 but the last modified date in S3 is
at 20:43. I don't see any logs indicating a failure to commit. This is
currently a blocker for us.
{{}}
{code:java}
2022-02-28T20:44:03.526+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
APPNAME=${sys:AppName} S3Committer:64 - Committing
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
with MPU ID
vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--
2022-02-28T20:44:03.224+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
APPNAME=${sys:AppName} S3Committer:64 - Committing
staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
with MPU ID
jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
{{}}
{{}}
{{ }}
> Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3
> (Race Condition)
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-26427
> URL: https://issues.apache.org/jira/browse/FLINK-26427
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.1
> Reporter: JOHN ERVINE
> Priority: Blocker
>
> I'm experiencing some odd behaviour when writing ORC files to S3 using flinks
> Streaming File Sink.
>
>
> {code:java}
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
> CheckpointingMode.EXACTLY_ONCE);
> env.getConfig().enableObjectReuse();
> Properties writerProperties = new Properties();
> writerProperties.put("orc.compress", "SNAPPY");
> //Order Book Sink
> StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
> .forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
> new OrcBulkWriterFactory<>(new
> OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new
> Configuration()))
> .withBucketAssigner(new OrderBookBucketingAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();{code}
>
> I noticed when running queries during ingest of the data, that my row counts
> were being decremented as the job progressed. I've had a look at S3 and I can
> seem multiple versions of the same part file. The example below shows part
> file 15-7 has two versions. The first file is 20.7mb and the last file that's
> committed is smaller at 5.1mb. In most cases the current file is normally
> larger but in my instance there are a few examples in the screenshot below
> where this is not the case.
>
> !https://i.stack.imgur.com/soU4b.png|width=2173,height=603!
>
> This looks like a typical race condition or failure to upload commits to S3
> successfully because the log below shows two commits for the same file very
> close together. The last commit is at 20:44 but the last modified date in S3
> is at 20:43. I don't see any logs indicating a failure to commit. This is
> currently a blocker for us.
>
> {code:java}
> 2022-02-28T20:44:03.526+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
> APPNAME=${sys:AppName} S3Committer:64 - Committing
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
> with MPU ID
> vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--
> 2022-02-28T20:44:03.224+0000 INFO APP=${sys:AppID} COMP=${sys:CompID}
> APPNAME=${sys:AppName} S3Committer:64 - Committing
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
> with MPU ID
> jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
>
> {{ }}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)