[
https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Victor Wong closed FLINK-12648.
-------------------------------
Resolution: Won't Fix
> Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
> -----------------------------------------------------------------
>
> Key: FLINK-12648
> URL: https://issues.apache.org/jira/browse/FLINK-12648
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Reporter: Victor Wong
> Assignee: Victor Wong
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I think there are some duplicated codes in
> _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in
> apache hadoop-common dependency.
> We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI,
> org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
>
> Replace
> {code:java}
> // -- (2) get the Hadoop file system class for that scheme
> final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
> try {
> fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme,
> hadoopConfig);
> }
> catch (IOException e) {
> throw new UnsupportedFileSystemSchemeException(
> "Hadoop File System abstraction does not support scheme '" + scheme
> + "'. " +
> "Either no file system implementation exists for that scheme,
> " +
> "or the relevant classes are missing from the classpath.", e);
> }
> // -- (3) instantiate the Hadoop file system
> LOG.debug("Instantiating for file system scheme {} Hadoop File System {}",
> scheme, fsClass.getName());
> final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
> // -- (4) create the proper URI to initialize the file system
> final URI initUri;
> if (fsUri.getAuthority() != null) {
> initUri = fsUri;
> }
> else {
> LOG.debug("URI {} does not specify file system authority, trying to load
> default authority (fs.defaultFS)");
> String configEntry = hadoopConfig.get("fs.defaultFS", null);
> if (configEntry == null) {
> // fs.default.name deprecated as of hadoop 2.2.0 - see
> //
> http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
> configEntry = hadoopConfig.get("fs.default.name", null);
> }
> if (LOG.isDebugEnabled()) {
> LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
> }
> if (configEntry == null) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration did not contain an entry for the default
> file system ('fs.defaultFS').");
> }
> else {
> try {
> initUri = URI.create(configEntry);
> }
> catch (IllegalArgumentException e) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "The configuration contains an invalid file system default
> name " +
> "('fs.default.name' or 'fs.defaultFS'): " + configEntry);
> }
> if (initUri.getAuthority() == null) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration for default file system
> ('fs.default.name' or 'fs.defaultFS') " +
> "contains no valid authority component (like hdfs namenode, S3
> host, etc)");
> }
> }
> }
> // -- (5) configure the Hadoop file system
> try {
> hadoopFs.initialize(initUri, hadoopConfig);
> }
> catch (UnknownHostException e) {
> String message = "The Hadoop file system's authority (" +
> initUri.getAuthority() +
> "), specified by either the file URI or the configuration, cannot be
> resolved.";
> throw new IOException(message, e);
> }
> {code}
> with
> {code:java}
> final org.apache.hadoop.fs.FileSystem hadoopFs =
> org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)