dynamically get hadoop conf file from system variables
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/dacf1bc3 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/dacf1bc3 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/dacf1bc3 Branch: refs/heads/steven/hdfs Commit: dacf1bc3f9b195e6998b5da879ca1c88032d8f03 Parents: 35b29a4 Author: efikalti <[email protected]> Authored: Wed Jun 10 18:52:54 2015 +0300 Committer: efikalti <[email protected]> Committed: Wed Jun 10 18:52:54 2015 +0300 ---------------------------------------------------------------------- .../apache/vxquery/hdfs2/HDFSFileFunctions.java | 106 -------------- .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 137 +++++++++++++++++++ .../VXQueryCollectionOperatorDescriptor.java | 77 ++++++----- .../runtime/functions/util/FunctionHelper.java | 38 ++--- 4 files changed, 196 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/dacf1bc3/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java deleted file mode 100644 index 787a70d..0000000 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFileFunctions.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.vxquery.hdfs2; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; - -public class HDFSFileFunctions { - - private final Configuration conf; - private FileSystem fs; - - /** - * Create the configuration and add the paths for core-site and hdfs-site as resources. - * Initialize an instance a hdfs FileSystem for this configuration. - * @param hadoop_conf_filepath - */ - public HDFSFileFunctions(String hadoop_conf_filepath) - { - this.conf = new Configuration(); - conf.addResource(new Path(hadoop_conf_filepath + "core-site.xml")); - conf.addResource(new Path(hadoop_conf_filepath + "hdfs-site.xml")); - - try { - fs = FileSystem.get(conf); - } catch (IOException ex) { - System.err.println(ex); - } - } - - /** - * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function. - * Searches in subdirectories of the home directory too. - * @param filename - * @return - */ - public boolean isLocatedInHDFS(String filename) - { - try { - //search file path - if (fs.exists(new Path(filename))) - { - return true; - } - } catch (IOException ex) { - System.err.println(ex); - } - //Search every file and folder in the home directory - if (searchInDirectory(fs.getHomeDirectory(), filename) != null) - { - return true; - } - return false; - } - - /** - * Searches the given directory and subdirectories for the file. - * @param directory to search - * @param filename of file we want - * @return path if file exists in this directory.else return null. - */ - public Path searchInDirectory(Path directory, String filename) - { - //Search every folder in the directory - try { - RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); - String[] parts; - Path path; - while(it.hasNext()) - { - path = it.next().getPath(); - parts = path.toString().split("/"); - if(parts[parts.length-1].equals(filename)) - { - return path; - } - } - } catch (IOException ex) { - System.err.println(ex); - } - return null; - } - - public FileSystem getFileSystem() - { - return this.fs; - } -} http://git-wip-us.apache.org/repos/asf/vxquery/blob/dacf1bc3/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java new file mode 100644 index 0000000..46ef557 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.hdfs2; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import sun.tools.tree.ThisExpression; + +public class HDFSFunctions { + + private final Configuration conf; + private FileSystem fs; + private String conf_path; + private final String conf_folder; + + /** + * Create the configuration and add the paths for core-site and hdfs-site as resources. + * Initialize an instance a hdfs FileSystem for this configuration. + * @param hadoop_conf_filepath + */ + public HDFSFunctions() + { + this.conf_folder = "/etc/hadoop/"; + locateConf(); + System.out.println(this.conf_path); + this.conf = new Configuration(); + conf.addResource(new Path(this.conf_path + "core-site.xml")); + conf.addResource(new Path(this.conf_path + "hdfs-site.xml")); + try { + fs = FileSystem.get(conf); + } catch (IOException ex) { + System.err.println(ex); + } + } + + /** + * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function. + * Searches in subdirectories of the home directory too. + * @param filename + * @return + */ + public boolean isLocatedInHDFS(String filename) + { + try { + //search file path + if (fs.exists(new Path(filename))) + { + return true; + } + } catch (IOException ex) { + System.err.println(ex); + } + //Search every file and folder in the home directory + if (searchInDirectory(fs.getHomeDirectory(), filename) != null) + { + return true; + } + return false; + } + + /** + * Searches the given directory and subdirectories for the file. + * @param directory to search + * @param filename of file we want + * @return path if file exists in this directory.else return null. + */ + public Path searchInDirectory(Path directory, String filename) + { + //Search every folder in the directory + try { + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + String[] parts; + Path path; + while(it.hasNext()) + { + path = it.next().getPath(); + parts = path.toString().split("/"); + if(parts[parts.length-1].equals(filename)) + { + return path; + } + } + } catch (IOException ex) { + System.err.println(ex); + } + return null; + } + + private void locateConf() + { + String conf = System.getenv("HADOOP_HOME"); + if (conf == null) + { + conf = System.getenv("HADOOP_PREFIX"); + if (conf != null) + { + this.conf_path = conf + this.conf_folder; + } + } + else + { + this.conf_path = conf + this.conf_folder;; + } + } + + public FileSystem getFileSystem() + { + if (this.conf_path != null) + { + return this.fs; + } + else + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/dacf1bc3/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 3caafc7..f2f26c8 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.vxquery.context.DynamicContext; -import org.apache.vxquery.hdfs2.HDFSFileFunctions; +import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; @@ -60,8 +60,6 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private String[] collectionPartitions; private List<Integer> childSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); - - private final String hdfs_conf_dir = "/home/efi/Utilities/hadoop/etc/hadoop/"; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, RecordDescriptor rDesc) { @@ -128,44 +126,47 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO //else check in HDFS file system else { - HDFSFileFunctions hdfs = new HDFSFileFunctions(hdfs_conf_dir); + HDFSFunctions hdfs = new HDFSFunctions(); FileSystem fs = hdfs.getFileSystem(); - Path directory = new Path(collectionModifiedName); - Path xmlDocument; - try { - if (fs.exists(directory) && fs.isDirectory(directory)) - { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - //read directory files from HDFS - RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); - while (it.hasNext()) - { - xmlDocument = it.next().getPath(); - if (fs.isFile(xmlDocument)) - { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); - } - parser.parseHDFSElements(new URI(xmlDocument.getName()), writer, fta, tupleIndex); - } - } + if (fs != null) + { + Path directory = new Path(collectionModifiedName); + Path xmlDocument; + try { + if (fs.exists(directory) && fs.isDirectory(directory)) + { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + //read directory files from HDFS + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + while (it.hasNext()) + { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) + { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); + } + parser.parseHDFSElements(new URI(xmlDocument.getName()), writer, fta, tupleIndex); + } + } + } } + else + { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + System.err.println(e); + } catch (IOException e) { + // TODO Auto-generated catch block + System.err.println(e); + } catch (URISyntaxException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - else - { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); - } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - System.err.println(e); - } catch (IOException e) { - // TODO Auto-generated catch block - System.err.println(e); - } catch (URISyntaxException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/dacf1bc3/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java index b7ab9a3..801c249 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/util/FunctionHelper.java @@ -41,7 +41,7 @@ import org.apache.vxquery.datamodel.util.DateTime; import org.apache.vxquery.datamodel.values.ValueTag; import org.apache.vxquery.exceptions.ErrorCode; import org.apache.vxquery.exceptions.SystemException; -import org.apache.vxquery.hdfs2.HDFSFileFunctions; +import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.runtime.functions.arithmetic.AbstractArithmeticOperation; import org.apache.vxquery.runtime.functions.cast.CastToDoubleOperation; import org.apache.vxquery.runtime.functions.comparison.AbstractValueComparisonOperation; @@ -1202,25 +1202,27 @@ public class FunctionHelper { //else check in HDFS file system else { - String hdfs_conf_dir = "/home/efi/Utilities/hadoop/etc/hadoop/"; - HDFSFileFunctions hdfs = new HDFSFileFunctions(hdfs_conf_dir); + HDFSFunctions hdfs = new HDFSFunctions(); FileSystem fs = hdfs.getFileSystem(); - Path xmlDocument = new Path(fName); - try { - if (fs.exists(xmlDocument)) - { - parser.parseHDFSDocument(new URI(xmlDocument.getName()), abvs); + if (fs != null) + { + Path xmlDocument = new Path(fName); + try { + if (fs.exists(xmlDocument)) + { + parser.parseHDFSDocument(new URI(xmlDocument.getName()), abvs); + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + System.err.println(e); + } catch (IOException e) { + // TODO Auto-generated catch block + System.err.println(e); + } catch (URISyntaxException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } - } catch (FileNotFoundException e) { - // TODO Auto-generated catch block - System.err.println(e); - } catch (IOException e) { - // TODO Auto-generated catch block - System.err.println(e); - } catch (URISyntaxException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + } } }
