[ 
https://issues.apache.org/jira/browse/FLINK-14955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984339#comment-16984339
 ] 

Simya Jose commented on FLINK-14955:
------------------------------------

 FLINK-14170 is related to hadoop version. My issue is with the first condition 
where it checks the fileSchema to be "hdfs"

(we are using haddop 2.7.3 but the fileSystem type "swift")

 

StreamingFileSink calls 'Buckets.java' which in turn calls 'RecoverableWriter' 
method in the respective FileSystem. 

SwiftFileSystemFactory creates HadoopFileSystem(SwiftFileSystem) and when 
'Buckets.java' invokes recoverableWriter, it goes to HadoopFileSystem's one and 
there we have a hard check on schema to be hdfs and it fails. 

 

 

> Not able to write to swift via StreamingFileSink.forBulkFormat
> --------------------------------------------------------------
>
>                 Key: FLINK-14955
>                 URL: https://issues.apache.org/jira/browse/FLINK-14955
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.1, 1.9.1
>            Reporter: Simya Jose
>            Priority: Major
>         Attachments: pom.xml
>
>
> not able to use StreamingFileSink to write to swift file storage
>  
> *Code*:
> flink version: 1.9.1.   ( tried with 1.8.1 as well, same exception)
>  scala 2.11
> build tool : maven
> main part of the code:
> val eligibleItems: DataStream[EligibleItem] = env.fromCollection(Seq(
>  EligibleItem("pencil"),
>  EligibleItem("rubber"),
>  EligibleItem("beer")))(TypeInformation.of(classOf[EligibleItem]))
> val factory2: ParquetWriterFactory[EligibleItem] = 
> ParquetAvroWriters.forReflectRecord(classOf[EligibleItem])
>  val sink: StreamingFileSink[EligibleItem] = StreamingFileSink
>  .forBulkFormat(new Path(capHadoopPath),factory2)
>  .build()
> eligibleItems.addSink(sink)
>  .setParallelism(1)
>  .uid("TEST_1")
>  .name("TEST")
> *scenario* : when path is set to point to swift ( capHadoopPath = 
> "swift://<path>" ) , getting exception - 
> _java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are 
> only supported for HDFS and for Hadoop version 2.7 or 
> newerjava.lang.UnsupportedOperationException: Recoverable writers on Hadoop 
> are only supported for HDFS and for Hadoop version 2.7 or newer at 
> org.apache.flink.fs.openstackhadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)_
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to