Hi Maulik, Do you try flink 1.9, is the problem is still there? Best, Congxian
Maulik Soneji <maulik.son...@gojek.com> 于2019年10月24日周四 下午9:03写道: > Hi everyone, > > We are running a Batch job on flink that reads data from GCS and does some > aggregation on this data. > We are intermittently getting issue: `No filesystem found for scheme gs` > > We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4 > > On remote debugging the task managers, we found that in a few task > managers, the *GcsFileSystemRegistrar is not added to the list of > FileSystem Schemes*. In these task managers, we get this issue. > > The collection *SCHEME_TO_FILESYSTEM* is getting modified only in > *setDefaultPipelineOptions* function call in > org.apache.beam.sdk.io.FileSystems class and this function is not getting > called and thus the GcsFileSystemRegistrar is not added to > *SCHEME_TO_FILESYSTEM*. > > *Detailed stacktrace:* > > > java.lang.IllegalArgumentException: No filesystem found for scheme gs > at org.apache.beam.sdk.io > .FileSystems.getFileSystemInternal(FileSystems.java:463) > at org.apache.beam.sdk.io > .FileSystems.matchNewResource(FileSystems.java:533) > at > org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) > at > org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62) > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58) > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at org.apache.flink.runtime.io > .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106) > at org.apache.flink.runtime.io > .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > at org.apache.flink.runtime.io > .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > Inorder to resolve this issue, we tried calling the following in > PTransform's expand function: > > FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); > > This function call is to make sure that the GcsFileSystemRegistrar is added > to the list, but this hasn't solved the issue. > > Can someone please help in checking why this might be happening and what > can be done to resolve this issue. > > Thanks and Regards, > Maulik >