Hey Preston,

I just wrote a reply on the user mailing list. Copying the reply here just in case:

----

Your observation seems to be correct. There is an issue with the file system registration.

The two types of errors you are seeing, as well as the successful run, are just due to the different structure of the generated transforms. The Flink scheduler will distribute them differently, which results in some pipelines being placed on task managers which happen to execute the FileSystems initialization code and others not.

There is a quick fix to at least initialize the file system in case it has not been initialized, by adding the loading code here: https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463

However, there we do not have the pipeline options available, which prevents any configuration. The problem is that the error occurs in the coder used in a native Flink operation which does not even run user code.

I believe the only way fix this is to ship the FileSystems initialization code in CoderTypeSerializer where we are sure to execute it in time for any coders which depend on it.

Could you file an issue? I'd be happy to fix this then.

Thanks,
Max

----

On 23.09.19 08:04, Koprivica,Preston Blake wrote:
Hello everyone. This is a cross-post from the users list.  It didn’t get much traction, so I thought I’d move over to the dev group, since this seems like it might be a an issue with initialization in the FlinkRunner (apologies in advance if I missed something silly).

I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry **should* *be properly seeded by the time the graph is compiled and executed:

java.lang.IllegalArgumentException: No filesystem found for scheme s3

    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)

    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)

    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)

    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)

     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)

    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)

    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)

    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)

    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)

    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

    at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)

     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

     at java.lang.Thread.run(Thread.java:748)

For reference, the write code resembles this:

FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()

                 .via(ParquetIO.sink(schema))

                .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>

                 .withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);

The issue does not appear to be related to ParquetIO.sink().  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.

Just trying some different knobs, I went ahead and set the following option:

         write = write.withNoSpilling();

This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:

java.lang.IllegalArgumentException: No filesystem found for scheme s3

    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)

    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)

    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)

    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)

     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)

     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)

     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)

    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)

    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)

    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)

    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)

     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

     at java.lang.Thread.run(Thread.java:748)

I’ll be interested to hear some theories on the differences/similarities in the stacks.  And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):

write = write.withIgnoreWindowing();

This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?

In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.:https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313 <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Cca33cc0168834f2d327708d73d463c69%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637045244185938359&sdata=lEH5yuFU3L%2FzT7Qy8m1pTahFG%2FH20AUh9rfQjVfYijI%3D&reserved=0>

Is it possible that I have hit upon a couple scenarios where that has not taken place?  Unfortunately, I’m not yet at a position to suggest a fix, but I’m guessing there’s some missing initialization code in one or more of the batch operators.  If this is indeed a legitimate issue, I’ll be happy to log an issue, but I’ll hold off until the community gets a chance to look at it.

Thanks,

  * Preston

CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Reply via email to