Peter Georgantas created BEAM-10600:
---------------------------------------

             Summary: Filesystems not properly registered using ... on 
FlinkRunner
                 Key: BEAM-10600
                 URL: https://issues.apache.org/jira/browse/BEAM-10600
             Project: Beam
          Issue Type: Bug
          Components: runner-flink, sdk-java-core
    Affects Versions: 2.22.0
            Reporter: Peter Georgantas


This seems to be very similar to this previously closed issue: 
https://issues.apache.org/jira/browse/BEAM-8303

Based on the timing of when I'm most frequently getting the error it appears to 
be related to the read side vs the write side of the previous issue.
{code:java}
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
 at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
 at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
 at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1165)
 at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1121)
 at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:604)
 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:595)
 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:541)
 at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:112)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
 at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
 at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
 at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
 at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
 at java.lang.Thread.run(Thread.java:748){code}

For reference, my read code resembles this:
{code:java}
pipeline.apply(Create.of("s3://bucket/prefix1", "s3://bucket/prefix2"))
          .apply(FileIO.match())
          .apply(FileIO.readMatches())
{code}

Looking at the PR (https://github.com/apache/beam/pull/9688) associated with 
the last issue, it seems as though the fix was to initialize the Filesystems 
across a number of different TransformTranslators. Is it possible that PR did 
not cover all use cases or a new case has been introduced?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to