[
https://issues.apache.org/jira/browse/FLINK-14574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
clay updated FLINK-14574:
-------------------------
Description:
As reported by a user via [mailing
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-FileSystem-for-scheme-quot-file-quot-for-S3A-in-and-state-processor-api-in-1-9-td30704.html]:
{noformat}
We've added flink-s3-fs-hadoop library to plugins folder and trying to
bootstrap state to S3 using S3A protocol. The following exception happens
(unless hadoop library is put to lib folder instead of plugins). Looks like
S3A filesystem is trying to use "local" filesystem for temporary files and
fails:
java.lang.Exception: Could not write timer service of MapPartition
(d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
at
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
at
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
at
org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not open output stream for state
backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at
org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at
org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
at
org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
at
org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
at
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
... 14 common frames omitted
Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException:
No FileSystem for scheme "file"
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 32 common frames omitted
{noformat}
I think the problem is caused by
{{org.apache.hadoop.fs.FileSystem#loadFileSystems}} method inside
{{flink-s3-fs-hadoop}}, which is using
{{ServiceLoader.load(FileSystem.class);}} to load a FileSystem via
{{Thread.currentThread().getContextClassLoader();}}. At this point of time
{{getContextClassLoader()}} is probably already the user class loader instead
of plugin's.
We should investigate why is this {{loadFileSystems}} method called so late
(after we have already restored user's class loader) and how can we workaround
this.
was:
As reported by a user via [mailing
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-FileSystem-for-scheme-quot-file-quot-for-S3A-in-and-state-processor-api-in-1-9-td30704.html]:
{noformat}
We've added flink-s3-fs-hadoop library to plugins folder and trying to
bootstrap state to S3 using S3A protocol. The following exception happens
(unless hadoop library is put to lib folder instead of plugins). Looks like
S3A filesystem is trying to use "local" filesystem for temporary files and
fails:
java.lang.Exception: Could not write timer service of MapPartition
(d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
at
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
at
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
at
org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not open output stream for state
backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at
org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at
org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
at
org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
at
org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
at
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
... 14 common frames omitted
Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException:
No FileSystem for scheme "file"
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 32 common frames omitted
{noformat}
I think the problem is caused by
{{org.apache.hadoop.fs.FileSystem#loadFileSystems}} method inside
{{flink-s3-fs-hadoop}}, which is using
{{ServiceLoader.load(FileSystem.class);}} to load a FileSystem via
{{Thread.currentThread().getContextClassLoader();}}. At this point of time
{{getContextClassLoader()}} is probably already the user class loader instead
of plugin's.
We should investigate why is this {{loadFileSystems}} method called so late
(after we have already restored user's class loader) and how can we workaround
this.
> flink-s3-fs-hadoop doesn't work with plugins mechanism
> -------------------------------------------------------
>
> Key: FLINK-14574
> URL: https://issues.apache.org/jira/browse/FLINK-14574
> Project: Flink
> Issue Type: Task
> Components: FileSystems
> Affects Versions: 1.9.1
> Reporter: Piotr Nowojski
> Assignee: Arvid Heise
> Priority: Blocker
> Fix For: 1.10.0
>
>
> As reported by a user via [mailing
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-FileSystem-for-scheme-quot-file-quot-for-S3A-in-and-state-processor-api-in-1-9-td30704.html]:
> {noformat}
> We've added flink-s3-fs-hadoop library to plugins folder and trying to
> bootstrap state to S3 using S3A protocol. The following exception happens
> (unless hadoop library is put to lib folder instead of plugins). Looks like
> S3A filesystem is trying to use "local" filesystem for temporary files and
> fails:
> java.lang.Exception: Could not write timer service of MapPartition
> (d2976134f80849779b7a94b7e6218476) (4/4) to checkpoint state stream.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> at
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:59)
> at
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:84)
> at
> org.apache.flink.state.api.output.BoundedStreamTask.performDefaultAction(BoundedStreamTask.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
> at
> org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
> at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
> at
> org.apache.flink.util.LinkedOptionalMapSerializer.lambda$writeOptionalMap$0(LinkedOptionalMapSerializer.java:58)
> at
> org.apache.flink.util.LinkedOptionalMap.forEach(LinkedOptionalMap.java:163)
> at
> org.apache.flink.util.LinkedOptionalMapSerializer.writeOptionalMap(LinkedOptionalMapSerializer.java:57)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeKryoRegistrations(KryoSerializerSnapshotData.java:141)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.writeSnapshotData(KryoSerializerSnapshotData.java:128)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.writeSnapshot(KryoSerializerSnapshot.java:72)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
> at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:199)
> at
> org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
> ... 14 common frames omitted
> Caused by:
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.UnsupportedFileSystemException:
> No FileSystem for scheme "file"
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:433)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:301)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:378)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
> at
> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
> ... 32 common frames omitted
> {noformat}
> I think the problem is caused by
> {{org.apache.hadoop.fs.FileSystem#loadFileSystems}} method inside
> {{flink-s3-fs-hadoop}}, which is using
> {{ServiceLoader.load(FileSystem.class);}} to load a FileSystem via
> {{Thread.currentThread().getContextClassLoader();}}. At this point of time
> {{getContextClassLoader()}} is probably already the user class loader instead
> of plugin's.
> We should investigate why is this {{loadFileSystems}} method called so late
> (after we have already restored user's class loader) and how can we
> workaround this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)