Peter Georgantas created BEAM-10600:
---------------------------------------
Summary: Filesystems not properly registered using ... on
FlinkRunner
Key: BEAM-10600
URL: https://issues.apache.org/jira/browse/BEAM-10600
Project: Beam
Issue Type: Bug
Components: runner-flink, sdk-java-core
Affects Versions: 2.22.0
Reporter: Peter Georgantas
This seems to be very similar to this previously closed issue:
https://issues.apache.org/jira/browse/BEAM-8303
Based on the timing of when I'm most frequently getting the error it appears to
be related to the read side vs the write side of the previous issue.
{code:java}
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1165)
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1121)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:604)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:595)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:541)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:112)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748){code}
For reference, my read code resembles this:
{code:java}
pipeline.apply(Create.of("s3://bucket/prefix1", "s3://bucket/prefix2"))
.apply(FileIO.match())
.apply(FileIO.readMatches())
{code}
Looking at the PR (https://github.com/apache/beam/pull/9688) associated with
the last issue, it seems as though the fix was to initialize the Filesystems
across a number of different TransformTranslators. Is it possible that PR did
not cover all use cases or a new case has been introduced?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)