kennknowles opened a new issue, #19148:
URL: https://github.com/apache/beam/issues/19148

   After upgrading to BEAM 2.7.0 and Flink 1.5.2 from BEAM 2.2.0) every day few 
times a day job throws _No such file or directory_ exception when trying to 
move temp bundle to the final location.
   
   After digging into the code it looks like it's kind of S3 eventual 
consistency problem. Where the HEAD request, used to check if the temporary 
file exists before copying it to final location, returns 404 and the whole copy 
operation fails.
   
   We use sharded writes(32). Job output arounds 256 files a minute. But the 
exception is thrown max 3 times a day - which suggest that there is some race 
condition somewhere.
   
    
   
   The case where S3 enforces eventual consistency is where the check if the 
file exist is being made before uploading the file. I checked the BEAM FileSink 
and couldn't find any logic that pre-check if the temp bundle file exists. 
   
    
   ```
   
   Amazon S3 provides read-after-write consistency for PUTS of new objects in 
your S3 bucket in all regions
   with one caveat. The caveat is that if you make a HEAD or GET request to the 
key name (to find if the
   object exists) before creating the object, Amazon S3 provides eventual 
consistency for read-after-write.
   ```
   
    
   
   The logs from the job
   ```
   
   2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening 
writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4
   for window [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane 
PaneInfo{isFirst=true, timing=ON_TIME,
   index=0, onTimeIndex=0} destination null
   
   2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer
   - Successfully wrote temporary file 
s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
   
   2018-10-29
   17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy temporary 
file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-
   
   10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4,
   shard=9, window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), 
paneInfo=PaneInfo{isFirst=true,
   timing=ON_TIME, index=0, onTimeIndex=0}} to final location 
s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz
   ```
   
    
   ```
   
   Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.io.FileNotFoundException: No such file
   or directory: 
s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
   at
   org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
   at 
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
   at
   
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
   at
   
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:128)
   at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:628)
   at
   
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:80)
   at
   
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:204)
   at
   
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
   at
   
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
   at
   
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:71)
   at
   
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
   at
   
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
   at
   org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
   at 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:101)
   at 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
   at
   
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
   at
   
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
   at
   
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:99)
   at
   
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:92)
   at
   
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
   at 
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
   at
   
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
   at 
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
   at
   org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
   at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
   at
   
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
   at
   
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
   at
   
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
   at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
   at
   
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
   at
   
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
   at
   
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
   at
   
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
   at
   org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
   at java.lang.Thread.run(Thread.java:748)
   Caused
   by: java.io.FileNotFoundException: No such file or directory: 
s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
   at
   org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
   at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
   at
   org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
   at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:367)
   at
   org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
   at 
org.apache.beam.sdk.io.hdfs.HadoopFileSystem.copy(HadoopFileSystem.java:132)
   at
   org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:288)
   at 
org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:761)
   at
   
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:797)
   ```
   
    
   
   CloudTrail logs (please note the the presented order of api calls might not 
reflected the reality as eventtime is second precision only)
   ||eventtime||operation||errormessage||requestparameters||
   
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
   |2018-10-29T17:45:03Z|ListObjects| 
|{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
   
|2018-10-29T17:45:03Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|
 |
   |2018-10-29T17:45:04Z|PutObject| 
|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
   |2018-10-29T17:45:05Z|HeadObject| 
|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
   |2018-10-29T17:45:06Z|ListObjects| 
|{"bucketName":"XXX","max-keys":"1","encoding-type":"url","prefix":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/","delimiter":"/"}|
   |2018-10-29T17:45:06Z|HeadObject| 
|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"|
   
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4"}|
 |
   
|2018-10-29T17:45:06Z|HeadObject|NoSuchKey|{"bucketName":"XXX","key":"beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4/"}|
 |
   
   Imported from Jira 
[BEAM-5934](https://issues.apache.org/jira/browse/BEAM-5934). Original Jira may 
contain additional context.
   Reported by: pawelbartoszek.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to