[
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937963#comment-16937963
]
Kenneth Knowles commented on BEAM-8303:
---------------------------------------
Copying reply from mailing list here, so it is more discoverable:
Are you building a fat jar? I don't know if this is your issue. I don't know
Flink's operation in any close detail, and I'm not sure how it relates to what
Max has described. But it is a common cause of this kind of error.
The registration of the filesystem is here:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java#L32.
This results in the built jar for beam-sdks-java-io-amazon-web-services to
have a file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar with
the line org.apache.beam.sdk.io.aws.s3.S3FileSystemRegistrar
The file META-INF/services/org.apache.beam.sdk.io.FileSystemRegistrar exists in
many dependencies, including the core SDK. The default for many fat jar tools
(maven shade and gradle shadow) is that they nondeterministically clobber each
other, and you have to add a line like "mergeServiceFiles" to your
configuration to keep all the registrations.
> Filesystems not properly registered using FileIO.write()
> --------------------------------------------------------
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.15.0
> Reporter: Preston Koprivica
> Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis
> (beam-2.15.0) and integrating with AWS S3. I have setup the PipelineOptions
> with all the relevant AWS options, so the filesystem registry **should** be
> properly seeded by the time the graph is compiled and executed:
> {code:java}
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
> at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
> at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> For reference, the write code resembles this:
> {code:java}
> FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
> .via(ParquetIO.sink(schema))
> .to(options.getOutputDir()). // will be something like:
> s3://<bucket>/<path>
> .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()),
> write);{code}
> The issue does not appear to be related to ParquetIO.sink(). I am able to
> reliably reproduce the issue using JSON formatted records and TextIO.sink(),
> as well. Moreover, AvroIO is affected if withWindowedWrites() option is
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled
> up the data set size. The stack trace, while very similar, reads:
> {code:java}
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
> at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748){code}
>
> And lastly, I tried adding the following deprecated option (with and without
> the withNoSpilling() option):
> {code:java}
> write = write.withIgnoreWindowing(); {code}
> This seemed to fix the issue altogether but aside from having to rely on a
> deprecated feature, there is the bigger issue of why?
>
> In reading through some of the source, it seems a common pattern to have to
> manually register the pipeline options to seed the filesystem registry during
> the setup part of the operator lifecycle, e.g.:
> [https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313|https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C024bc6b438914e7351c008d74037641d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637048478964357677&sdata=iGNAsktzEA9T2hlKQ4e3oscwL8xLQFuCZ6hsGHQb1So%3D&reserved=0]
>
>
> Is it possible that I have hit upon a couple scenarios where that has not
> taken place?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)