[ 
https://issues.apache.org/jira/browse/FLINK-30504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng closed FLINK-30504.
-------------------------------
      Assignee: Caizhi Weng
    Resolution: Fixed

master: 929c1110bdbf521ad0a9eb09a65d33f76a2b5990
release-0.3: fe8de4c32c148bb87f5a40649ca2373e88f321d8

> Fix UnsupportedFileSystemSchemeException when writing Table Store on OSS with 
> other engines
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30504
>                 URL: https://issues.apache.org/jira/browse/FLINK-30504
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>    Affects Versions: table-store-0.3.0, table-store-0.4.0
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: table-store-0.3.0, table-store-0.4.0
>
>
> Currently when writing Table Store tables on OSS with other engines (for 
> example Spark), the following exception will occur.
> {code}
> 22/12/23 17:54:12 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3) 
> (core-1-1.c-c9f1b761c8946269.cn-huhehaote.emr.aliyuncs.com executor 2): 
> java.lang.RuntimeException: Failed to find latest snapshot id
>   at 
> org.apache.flink.table.store.file.utils.SnapshotManager.latestSnapshotId(SnapshotManager.java:81)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.scanExistingFileMetas(AbstractFileStoreWrite.java:87)
>   at 
> org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:113)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.createWriter(AbstractFileStoreWrite.java:227)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.lambda$getWriter$1(AbstractFileStoreWrite.java:217)
>   at java.util.HashMap.computeIfAbsent(HashMap.java:1128)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.getWriter(AbstractFileStoreWrite.java:217)
>   at 
> org.apache.flink.table.store.file.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:106)
>   at 
> org.apache.flink.table.store.table.sink.TableWriteImpl.write(TableWriteImpl.java:63)
>   at 
> org.apache.flink.table.store.spark.SparkWrite$WriteRecords.call(SparkWrite.java:124)
>   at 
> org.apache.flink.table.store.spark.SparkWrite$WriteRecords.call(SparkWrite.java:105)
>   at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:752)
>   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:237)
>   at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:220)
>   at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1431)
>   at org.apache.spark.rdd.RDD.$anonfun$reduce$2(RDD.scala:1097)
>   at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:136)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> Could not find a file system implementation for scheme 'oss'. The scheme is 
> directly supported by Flink through the following plugin(s): 
> flink-oss-fs-hadoop. Please ensure that each plugin resides within its own 
> subfolder within the plugins directory. See 
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
>  for more information. If you want to use a Hadoop file system for that 
> scheme, please add the scheme to the configuration 
> fs.allowed-fallback-filesystems. For a full list of supported file systems, 
> please see 
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
>   at 
> org.apache.flink.table.store.file.utils.SnapshotManager.findLatest(SnapshotManager.java:164)
>   at 
> org.apache.flink.table.store.file.utils.SnapshotManager.latestSnapshotId(SnapshotManager.java:79)
>   ... 30 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to