Hi Maulik,
Do you try flink 1.9, is the problem is still there?

Best,
Congxian


Maulik Soneji <maulik.son...@gojek.com> 于2019年10月24日周四 下午9:03写道:

> Hi everyone,
>
> We are running a Batch job on flink that reads data from GCS and does some
> aggregation on this data.
> We are intermittently getting issue: `No filesystem found for scheme gs`
>
> We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4
>
> On remote debugging the task managers, we found that in a few task
> managers, the *GcsFileSystemRegistrar is not added to the list of
> FileSystem Schemes*. In these task managers, we get this issue.
>
> The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
> *setDefaultPipelineOptions* function call in
> org.apache.beam.sdk.io.FileSystems class and this function is not getting
> called and thus the GcsFileSystemRegistrar is not added to
> *SCHEME_TO_FILESYSTEM*.
>
> *Detailed stacktrace:*
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme gs
>         at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:463)
>         at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:533)
>         at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
>         at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
>         at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
>         at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
>         at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>         at org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>         at org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
>
> Inorder to resolve this issue, we tried calling the following in
> PTransform's expand function:
>
> FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
>
> This function call is to make sure that the GcsFileSystemRegistrar is added
> to the list, but this hasn't solved the issue.
>
> Can someone please help in checking why this might be happening and what
> can be done to resolve this issue.
>
> Thanks and Regards,
> Maulik
>

Reply via email to