[
https://issues.apache.org/jira/browse/FLINK-17253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087415#comment-17087415
]
chaganti spurthi commented on FLINK-17253:
------------------------------------------
Here is the PR that we tested on our cluster and now its actually writing to
our cluster.
[https://github.com/apache/flink/pull/11815]
> Support writing to viewfs for hadoop versions < 2.7 when using
> BulkFormatBuilder in StreamingFileSink
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-17253
> URL: https://issues.apache.org/jira/browse/FLINK-17253
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.8.0, 1.8.1, 1.8.2, 1.8.3, 1.9.0
> Reporter: chaganti spurthi
> Priority: Major
> Labels: pull-request-available
>
> FLINK-14170 introduced Hadoop version check to support older hadoop versions.
> However the check only included "hdfs" scheme but not "viewfs". We are using
> StreamingFileSink to write data to our federated hadoop cluster with cdh-2.6
> hadoop version and we are hit with
> {code:java}
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
> only supported for HDFS and for Hadoop version 2.7 or newer at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:61)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at
> java.lang.Thread.run(Thread.java:748)
> {code}
> The change is add viewfs to the scheme check.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)