changed HDFS_CONF retrieval, user defines that in the cluster.properties file
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/42d74b49 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/42d74b49 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/42d74b49 Branch: refs/heads/steven/hdfs Commit: 42d74b49f83700b5ce3a440b7356569871187a0b Parents: 5c8678c Author: efikalti <[email protected]> Authored: Thu Jul 2 19:19:59 2015 +0300 Committer: efikalti <[email protected]> Committed: Thu Jul 2 19:19:59 2015 +0300 ---------------------------------------------------------------------- pom.xml | 18 ++ vxquery-cli/pom.xml | 11 + .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 210 +++++++++---------- .../VXQueryCollectionOperatorDescriptor.java | 127 +++++------ .../src/main/resources/conf/cluster.properties | 7 +- 5 files changed, 199 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f223129..e16df18 100644 --- a/pom.xml +++ b/pom.xml @@ -275,6 +275,23 @@ <artifactId>hyracks-dataflow-std</artifactId> <version>${hyracks.version}</version> </dependency> + + <dependency> + <groupId>edu.uci.ics.hyracks</groupId> + <artifactId>hyracks-hdfs-core</artifactId> + <version>${hyracks.version}</version> + </dependency> + <dependency> + <groupId>edu.uci.ics.hyracks</groupId> + <artifactId>hyracks-hdfs-2.x</artifactId> + <version>${hyracks.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.7.0</version> + </dependency> <dependency> <groupId>ant</groupId> @@ -344,6 +361,7 @@ <version>6.1.4</version> <scope>compile</scope> </dependency> + </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-cli/pom.xml ---------------------------------------------------------------------- diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml index 620a4a8..4d19fd7 100644 --- a/vxquery-cli/pom.xml +++ b/vxquery-cli/pom.xml @@ -126,7 +126,18 @@ <groupId>edu.uci.ics.hyracks</groupId> <artifactId>hyracks-dataflow-std</artifactId> </dependency> + + <dependency> + <groupId>edu.uci.ics.hyracks</groupId> + <artifactId>hyracks-hdfs-core</artifactId> + </dependency> + <dependency> + <groupId>edu.uci.ics.hyracks</groupId> + <artifactId>hyracks-hdfs-2.x</artifactId> + </dependency> </dependencies> + + <reporting> <plugins> http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java index 24b16da..50037ea 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -16,7 +16,10 @@ */ package org.apache.vxquery.hdfs2; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -25,80 +28,75 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; public class HDFSFunctions { - + private Configuration conf; private FileSystem fs; private String conf_path; - private final String conf_folder = "/etc/hadoop/"; - + /** * Create the configuration and add the paths for core-site and hdfs-site as resources. * Initialize an instance of HDFS FileSystem for this configuration. - * @param hadoop_conf_filepath + * + * @param hadoop_conf_filepath */ - public HDFSFunctions() - { - if(locateConf()) - { - this.conf = new Configuration(); - conf.addResource(new Path(this.conf_path + "core-site.xml")); - conf.addResource(new Path(this.conf_path + "hdfs-site.xml")); - try { - fs = FileSystem.get(conf); - } catch (IOException ex) { - System.err.println(ex); - } - } - else - { - //print error cannot locate configuration folder for HDFS - } + public HDFSFunctions() { + if (locateConf()) { + this.conf = new Configuration(); + + conf.addResource(new Path(this.conf_path + "/core-site.xml")); + conf.addResource(new Path(this.conf_path + "/hdfs-site.xml")); + try { + fs = FileSystem.get(conf); + } catch (IOException ex) { + System.err.println(ex); + } + } else { + System.err.println("Could not locate hdfs configuarion folder."); + } } - + /** * Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function. * Searches in subdirectories of the home directory too. + * * @param filename - * @return + * @return */ - public boolean isLocatedInHDFS(String filename) - { - try { - //search file path - if (fs.exists(new Path(filename))) - { - return true; - } - } catch (IOException ex) { - System.err.println(ex); - } + public boolean isLocatedInHDFS(String filename) { + try { + //search file path + if (fs.exists(new Path(filename))) { + return true; + } + } catch (IOException ex) { + System.err.println(ex); + } //Search every file and folder in the home directory - if (searchInDirectory(fs.getHomeDirectory(), filename) != null) - { + if (searchInDirectory(fs.getHomeDirectory(), filename) != null) { return true; } return false; } - + /** * Searches the given directory and subdirectories for the file. - * @param directory to search - * @param filename of file we want + * + * @param directory + * to search + * @param filename + * of file we want * @return path if file exists in this directory.else return null. */ - public Path searchInDirectory(Path directory, String filename) - { + public Path searchInDirectory(Path directory, String filename) { //Search every folder in the directory try { RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); String[] parts; Path path; - while(it.hasNext()) - { + while (it.hasNext()) { path = it.next().getPath(); parts = path.toString().split("/"); - if(parts[parts.length-1].equals(filename)) - { + if (parts[parts.length - 1].equals(filename)) { return path; } } @@ -107,98 +105,88 @@ public class HDFSFunctions { } return null; } - + /** - * Search in the system environment variables for the Hadoop/HDFS home folder to get the path to the configuration. - * Variables it is checking are: HADOOP_HOME,HADOOP_PREFIX,HADOOP_CONF_DIR,HADOOP_HDFS_HOME. + * Read the cluster properties file and locate the HDFS_CONF variable that is the directory path for the + * hdfs configuration if the system environment variable HDFS_CONF is not set. + * * @return true if is successfully finds the Hadoop/HDFS home directory */ - private boolean locateConf() - {//HADOOP_HOME - String conf = System.getenv("HADOOP_HOME"); - if (conf == null) - {//HADOOP_PREFIX - conf = System.getenv("HADOOP_PREFIX"); - if (conf != null) - { - this.conf_path = conf + this.conf_folder; - } - else - {//HADOOP_CONF_DIR - conf = System.getenv("HADOOP_CONF_DIR"); - if (conf != null) - { - this.conf_path = conf + this.conf_folder; - } - else - {//HADOOP_HDFS_HOME - conf = System.getenv("HADOOP_HDFS_HOME"); - if (conf != null) - { - this.conf_path = conf + this.conf_folder; - } - else - { - return false; - } - } - } - } - else - { - this.conf_path = conf + this.conf_folder;; - } - return true; + private boolean locateConf() { + + this.conf_path = System.getProperty("HDFS_CONF"); + if (this.conf_path == null) { + + // load properties file + Properties prop = new Properties(); + String propFilePath = "../vxquery-server/src/main/resources/conf/cluster.properties"; + try { + prop.load(new FileInputStream(propFilePath)); + } catch (FileNotFoundException e) { + propFilePath = "vxquery-server/src/main/resources/conf/cluster.properties"; + try { + prop.load(new FileInputStream(propFilePath)); + } catch (FileNotFoundException e1) { + e1.printStackTrace(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } catch (IOException e) { + e.printStackTrace(); + } + + // get the property value for HDFS_CONF + this.conf_path = prop.getProperty("HDFS_CONF"); + if (this.conf_path == null) { + return false; + } + return true; + } + return true; } - + /** * Upload a file/directory to HDFS.Filepath is the path in the local file system.dir is the destination path. + * * @param filepath * @param dir * @return */ - public boolean put(String filepath,String dir) - { + public boolean put(String filepath, String dir) { if (this.fs != null) { Path path = new Path(filepath); Path dest = new Path(dir); - try - { - if (fs.exists(dest)) - { + try { + if (fs.exists(dest)) { fs.delete(dest, true); //recursive delete } - } - catch (IOException e) - { + } catch (IOException e) { e.printStackTrace(); } - try - { + try { fs.copyFromLocalFile(path, dest); - } - catch (IOException e) - { + } catch (IOException e) { e.printStackTrace(); } } - return false; + return false; } - + /** * Get instance of the HDFSfile system if it is configured correctly. * Return null if there is no instance. + * * @return */ - public FileSystem getFileSystem() - { - if (this.conf_path != null) - { - return this.fs; - } - else - { - return null; - } + public FileSystem getFileSystem() { + if (this.conf_path != null) { + return this.fs; + } else { + return null; + } + } + + public void scheduleSplits() { + } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index d0fbfe2..c945395 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -20,6 +20,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -57,7 +59,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private String[] collectionPartitions; private List<Integer> childSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); - + public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, RecordDescriptor rDesc) { super(spec, 1, 1); @@ -96,70 +98,73 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void nextFrame(ByteBuffer buffer) throws HyracksDataException { fta.reset(buffer); String collectionModifiedName = collectionName.replace("${nodeId}", nodeId); - + File collectionDirectory = new File(collectionModifiedName); //check if directory is in the local file system - if(collectionDirectory.exists()) - { - // Go through each tuple. - if (collectionDirectory.isDirectory()) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - @SuppressWarnings("unchecked") - Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), - TrueFileFilter.INSTANCE); - while (it.hasNext()) { - File xmlDocument = it.next(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); - } - parser.parseElements(xmlDocument, writer, fta, tupleIndex); - } - } - } else { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); - } + if (collectionDirectory.exists()) { + // Go through each tuple. + if (collectionDirectory.isDirectory()) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), + TrueFileFilter.INSTANCE); + while (it.hasNext()) { + File xmlDocument = it.next(); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getAbsolutePath()); + } + parser.parseElements(xmlDocument, writer, fta, tupleIndex); + } + } + } else { + throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + + collectionDirectory.getAbsolutePath() + ") passed to collection."); + } } //else check in HDFS file system - else - { - HDFSFunctions hdfs = new HDFSFunctions(); - FileSystem fs = hdfs.getFileSystem(); - if (fs != null) - { - Path directory = new Path(collectionModifiedName); - Path xmlDocument; - try { - if (fs.exists(directory) && fs.isDirectory(directory)) - { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - //read directory files from HDFS - RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); - while (it.hasNext()) - { - xmlDocument = it.next().getPath(); - if (fs.isFile(xmlDocument)) - { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); - } - InputStream in = fs.open(xmlDocument).getWrappedStream(); - parser.parseHDFSElements(in, writer, fta, tupleIndex); - } - } - } - } - else - { - throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" - + collectionDirectory.getAbsolutePath() + ") passed to collection."); - } - } catch (FileNotFoundException e) { - System.err.println(e); - } catch (IOException e) { - System.err.println(e); - } - } + else { + //get instance of the HDFS filesystem + HDFSFunctions hdfs = new HDFSFunctions(); + FileSystem fs = hdfs.getFileSystem(); + if (fs != null) { + Path directory = new Path(collectionModifiedName); + Path xmlDocument; + try { + //check if the path exists and is a directory + if (fs.exists(directory) && fs.isDirectory(directory)) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + //read every files in the directory + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + while (it.hasNext()) { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); + } + //create an input stream to the file currently reading and send it to parser + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); + } + } + } + } else { + throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" + + collectionDirectory + ") passed to collection."); + } + } catch (FileNotFoundException e) { + System.err.println(e); + } catch (IOException e) { + System.err.println(e); + } + + //Check for collection with tags + if (true) { + try { + System.out.println(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-server/src/main/resources/conf/cluster.properties ---------------------------------------------------------------------- diff --git a/vxquery-server/src/main/resources/conf/cluster.properties b/vxquery-server/src/main/resources/conf/cluster.properties index fd015d4..8c7ca66 100644 --- a/vxquery-server/src/main/resources/conf/cluster.properties +++ b/vxquery-server/src/main/resources/conf/cluster.properties @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -52,3 +52,6 @@ NCJAVA_OPTS="-server -Xmx7G -Djava.util.logging.config.file=./vxquery-benchmark/ # debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties" # Yourkit option: -agentpath:/tools/yjp-2014-build-14114/bin/linux-x86-64/libyjpagent.so=port=20001" # Yourkit mac option: -agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling + +#HDFS configuration directory +HDFS_CONF=/home/efi/Utilities/hadoop/etc/hadoop
