Hi Maulik,
Thanks for reporting. As Preston already pointed out, this is fixed in
the upcoming 2.17.0 release.
Thanks,
Max
On 24.10.19 15:20, Koprivica,Preston Blake wrote:
Hi Maulik,
I believe you may be witnessing this issue:
https://issues.apache.org/jira/browse/BEAM-8303. We ran into this using
beam-2.15.0 on flink-1.8 over S3. It looks like it’ll be fixed in 2.17.0.
As a temporary workaround, you can set the #withNoSpilling() option if
you’re using the FileIO api. If not, it should be relatively easy to
move to it.
*From: *Maulik Soneji <maulik.son...@gojek.com>
*Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
*Date: *Thursday, October 24, 2019 at 7:05 AM
*To: *"dev@beam.apache.org" <dev@beam.apache.org>
*Subject: *Intermittent No FileSystem found exception
Hi everyone,
We are running a Batch job on flink that reads data from GCS and does
some aggregation on this data.
We are intermittently getting issue: `No filesystem found for scheme gs`
We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4
On remote debugging the task managers, we found that in a few task
managers, the *GcsFileSystemRegistrar is not added to the list of
FileSystem Schemes*. In these task managers, we get this issue.
The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
*setDefaultPipelineOptions* function call in
org.apache.beam.sdk.io.FileSystems class and this function is not
getting called and thus the GcsFileSystemRegistrar is not added to
*SCHEME_TO_FILESYSTEM*.
*Detailed stacktrace:*
|java.lang.IllegalArgumentException: No filesystem found for scheme gs|
| at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)|
| at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)|
| at
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)|
| at
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)|
| at
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)|
| at
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)|
| at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)|
| at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)|
| at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)|
| at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)|
| at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)|
| at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)|
| at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)|
| at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)|
| at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)|
| at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)|
| at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)|
| at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)|
| at
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)|
| at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)|
| at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)|
| at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)|
| at java.lang.Thread.run(Thread.java:748)|
Inorder to resolve this issue, we tried calling the following in
PTransform's expand function:
FileSystems./setDefaultPipelineOptions/(PipelineOptionsFactory./create/());
This function call is to make sure that the GcsFileSystemRegistrar is
added to the list, but this hasn't solved the issue.
Can someone please help in checking why this might be happening and what
can be done to resolve this issue.
Thanks and Regards,
Maulik
CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may constitute
inside or non-public information under international, federal, or state
securities laws. Unauthorized forwarding, printing, copying,
distribution, or use of such information is strictly prohibited and may
be unlawful. If you are not the addressee, please promptly delete this
message and notify the sender of the delivery error by e-mail or you may
call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024.