changed HDFS_CONF retrieval, user defines that in the cluster.properties file


Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/42d74b49
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/42d74b49
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/42d74b49

Branch: refs/heads/steven/hdfs
Commit: 42d74b49f83700b5ce3a440b7356569871187a0b
Parents: 5c8678c
Author: efikalti <[email protected]>
Authored: Thu Jul 2 19:19:59 2015 +0300
Committer: efikalti <[email protected]>
Committed: Thu Jul 2 19:19:59 2015 +0300

----------------------------------------------------------------------
 pom.xml                                         |  18 ++
 vxquery-cli/pom.xml                             |  11 +
 .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 210 +++++++++----------
 .../VXQueryCollectionOperatorDescriptor.java    | 127 +++++------
 .../src/main/resources/conf/cluster.properties  |   7 +-
 5 files changed, 199 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f223129..e16df18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,23 @@
                 <artifactId>hyracks-dataflow-std</artifactId>
                 <version>${hyracks.version}</version>
             </dependency>
+            
+            <dependency>
+                                       <groupId>edu.uci.ics.hyracks</groupId>
+                                       
<artifactId>hyracks-hdfs-core</artifactId>
+                                       <version>${hyracks.version}</version>
+                   </dependency>
+            <dependency>
+                                       <groupId>edu.uci.ics.hyracks</groupId>
+                                       
<artifactId>hyracks-hdfs-2.x</artifactId>
+                                       <version>${hyracks.version}</version>
+                   </dependency>
+                   
+                   <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <version>2.7.0</version>
+               </dependency>
 
             <dependency>
                 <groupId>ant</groupId>
@@ -344,6 +361,7 @@
                 <version>6.1.4</version>
                 <scope>compile</scope>
             </dependency>
+            
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-cli/pom.xml
----------------------------------------------------------------------
diff --git a/vxquery-cli/pom.xml b/vxquery-cli/pom.xml
index 620a4a8..4d19fd7 100644
--- a/vxquery-cli/pom.xml
+++ b/vxquery-cli/pom.xml
@@ -126,7 +126,18 @@
             <groupId>edu.uci.ics.hyracks</groupId>
             <artifactId>hyracks-dataflow-std</artifactId>
         </dependency>
+        
+            <dependency>
+                                       <groupId>edu.uci.ics.hyracks</groupId>
+                                       
<artifactId>hyracks-hdfs-core</artifactId>
+                   </dependency>
+            <dependency>
+                                       <groupId>edu.uci.ics.hyracks</groupId>
+                                       
<artifactId>hyracks-hdfs-2.x</artifactId>
+                   </dependency>
     </dependencies>
+    
+    
 
     <reporting>
         <plugins>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
index 24b16da..50037ea 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -16,7 +16,10 @@
  */
 package org.apache.vxquery.hdfs2;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -25,80 +28,75 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
 public class HDFSFunctions {
-       
+
     private Configuration conf;
     private FileSystem fs;
     private String conf_path;
-    private final String conf_folder = "/etc/hadoop/";
-    
+
     /**
      * Create the configuration and add the paths for core-site and hdfs-site 
as resources.
      * Initialize an instance of HDFS FileSystem for this configuration.
-     * @param hadoop_conf_filepath 
+     * 
+     * @param hadoop_conf_filepath
      */
-    public HDFSFunctions()
-    {
-       if(locateConf())
-       {
-               this.conf = new Configuration();
-               conf.addResource(new Path(this.conf_path + "core-site.xml"));
-               conf.addResource(new Path(this.conf_path + "hdfs-site.xml"));
-               try {
-                   fs =  FileSystem.get(conf);
-               } catch (IOException ex) {
-                   System.err.println(ex);
-               }
-       }
-       else
-       {
-               //print error cannot locate configuration folder for HDFS
-       }
+    public HDFSFunctions() {
+        if (locateConf()) {
+            this.conf = new Configuration();
+
+            conf.addResource(new Path(this.conf_path + "/core-site.xml"));
+            conf.addResource(new Path(this.conf_path + "/hdfs-site.xml"));
+            try {
+                fs = FileSystem.get(conf);
+            } catch (IOException ex) {
+                System.err.println(ex);
+            }
+        } else {
+            System.err.println("Could not locate hdfs configuarion folder.");
+        }
     }
-    
+
     /**
      * Returns true if the file path exists or it is located somewhere in the 
home directory of the user that called the function.
      * Searches in subdirectories of the home directory too.
+     * 
      * @param filename
-     * @return 
+     * @return
      */
-    public boolean isLocatedInHDFS(String filename)
-    {
-        try {
-             //search file path
-             if (fs.exists(new Path(filename)))
-             {
-                 return true;
-             }
-         } catch (IOException ex) {
-             System.err.println(ex);
-         }
+    public boolean isLocatedInHDFS(String filename) {
+        try {
+            //search file path
+            if (fs.exists(new Path(filename))) {
+                return true;
+            }
+        } catch (IOException ex) {
+            System.err.println(ex);
+        }
         //Search every file and folder in the home directory
-        if (searchInDirectory(fs.getHomeDirectory(), filename) != null)
-        {
+        if (searchInDirectory(fs.getHomeDirectory(), filename) != null) {
             return true;
         }
         return false;
     }
-    
+
     /**
      * Searches the given directory and subdirectories for the file.
-     * @param directory to search
-     * @param filename of file we want
+     * 
+     * @param directory
+     *            to search
+     * @param filename
+     *            of file we want
      * @return path if file exists in this directory.else return null.
      */
-    public Path searchInDirectory(Path directory, String filename)
-    {
+    public Path searchInDirectory(Path directory, String filename) {
         //Search every folder in the directory
         try {
             RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, 
true);
             String[] parts;
             Path path;
-            while(it.hasNext())
-            {
+            while (it.hasNext()) {
                 path = it.next().getPath();
                 parts = path.toString().split("/");
-                if(parts[parts.length-1].equals(filename))
-                {
+                if (parts[parts.length - 1].equals(filename)) {
                     return path;
                 }
             }
@@ -107,98 +105,88 @@ public class HDFSFunctions {
         }
         return null;
     }
-    
+
     /**
-     * Search in the system environment variables for the Hadoop/HDFS home 
folder to get the path to the configuration.
-     * Variables it is checking are: 
HADOOP_HOME,HADOOP_PREFIX,HADOOP_CONF_DIR,HADOOP_HDFS_HOME.
+     * Read the cluster properties file and locate the HDFS_CONF variable that 
is the directory path for the
+     * hdfs configuration if the system environment variable HDFS_CONF is not 
set.
+     * 
      * @return true if is successfully finds the Hadoop/HDFS home directory
      */
-    private boolean locateConf()
-    {//HADOOP_HOME
-       String conf = System.getenv("HADOOP_HOME");
-       if (conf == null)
-       {//HADOOP_PREFIX
-               conf = System.getenv("HADOOP_PREFIX");
-               if (conf != null)
-               {
-                       this.conf_path = conf + this.conf_folder;
-               }
-               else
-               {//HADOOP_CONF_DIR
-                       conf = System.getenv("HADOOP_CONF_DIR");
-                       if (conf != null)
-                       {
-                               this.conf_path = conf + this.conf_folder;
-                       }
-                       else
-                       {//HADOOP_HDFS_HOME
-                               conf = System.getenv("HADOOP_HDFS_HOME");
-                               if (conf != null)
-                               {
-                                       this.conf_path = conf + 
this.conf_folder;
-                               }
-                               else
-                               {
-                                       return false;
-                               }
-                       }
-               }
-       }
-       else
-       {
-               this.conf_path = conf + this.conf_folder;;
-       }
-       return true;
+    private boolean locateConf() {
+
+        this.conf_path = System.getProperty("HDFS_CONF");
+        if (this.conf_path == null) {
+
+            // load properties file
+            Properties prop = new Properties();
+            String propFilePath = 
"../vxquery-server/src/main/resources/conf/cluster.properties";
+            try {
+                prop.load(new FileInputStream(propFilePath));
+            } catch (FileNotFoundException e) {
+                propFilePath = 
"vxquery-server/src/main/resources/conf/cluster.properties";
+                try {
+                    prop.load(new FileInputStream(propFilePath));
+                } catch (FileNotFoundException e1) {
+                    e1.printStackTrace();
+                } catch (IOException e1) {
+                    e1.printStackTrace();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            // get the property value for HDFS_CONF
+            this.conf_path = prop.getProperty("HDFS_CONF");
+            if (this.conf_path == null) {
+                return false;
+            }
+            return true;
+        }
+        return true;
     }
-    
+
     /**
      * Upload a file/directory to HDFS.Filepath is the path in the local file 
system.dir is the destination path.
+     * 
      * @param filepath
      * @param dir
      * @return
      */
-    public boolean put(String filepath,String dir)
-    {
+    public boolean put(String filepath, String dir) {
         if (this.fs != null) {
             Path path = new Path(filepath);
             Path dest = new Path(dir);
-            try 
-            {
-                if (fs.exists(dest)) 
-                {
+            try {
+                if (fs.exists(dest)) {
                     fs.delete(dest, true); //recursive delete
                 }
-            } 
-            catch (IOException e) 
-            {
+            } catch (IOException e) {
                 e.printStackTrace();
             }
-            try 
-            {
+            try {
                 fs.copyFromLocalFile(path, dest);
-            } 
-            catch (IOException e) 
-            {
+            } catch (IOException e) {
                 e.printStackTrace();
             }
         }
-       return false;
+        return false;
     }
-    
+
     /**
      * Get instance of the HDFSfile system if it is configured correctly.
      * Return null if there is no instance.
+     * 
      * @return
      */
-    public FileSystem getFileSystem()
-    {
-       if (this.conf_path != null)
-       {
-               return this.fs;
-       }
-       else
-       {
-               return null;
-       }
+    public FileSystem getFileSystem() {
+        if (this.conf_path != null) {
+            return this.fs;
+        } else {
+            return null;
+        }
+    }
+
+    public void scheduleSplits() {
+
     }
 }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index d0fbfe2..c945395 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -20,6 +20,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -57,7 +59,7 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     private String[] collectionPartitions;
     private List<Integer> childSeq;
     protected static final Logger LOGGER = 
Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
-    
+
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
             RecordDescriptor rDesc) {
         super(spec, 1, 1);
@@ -96,70 +98,73 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 fta.reset(buffer);
                 String collectionModifiedName = 
collectionName.replace("${nodeId}", nodeId);
-                
+
                 File collectionDirectory = new File(collectionModifiedName);
                 //check if directory is in the local file system
-                if(collectionDirectory.exists())
-                {
-                       // Go through each tuple.
-                       if (collectionDirectory.isDirectory()) {
-                           for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
-                               @SuppressWarnings("unchecked")
-                               Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
-                                       TrueFileFilter.INSTANCE);
-                               while (it.hasNext()) {
-                                   File xmlDocument = it.next();
-                                   if (LOGGER.isLoggable(Level.FINE)) {
-                                       LOGGER.fine("Starting to read XML 
document: " + xmlDocument.getAbsolutePath());
-                                   }
-                                   parser.parseElements(xmlDocument, writer, 
fta, tupleIndex);
-                               }
-                           }
-                       } else {
-                           throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
-                                   + collectionDirectory.getAbsolutePath() + 
") passed to collection.");
-                       }
+                if (collectionDirectory.exists()) {
+                    // Go through each tuple.
+                    if (collectionDirectory.isDirectory()) {
+                        for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                            Iterator<File> it = 
FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(),
+                                    TrueFileFilter.INSTANCE);
+                            while (it.hasNext()) {
+                                File xmlDocument = it.next();
+                                if (LOGGER.isLoggable(Level.FINE)) {
+                                    LOGGER.fine("Starting to read XML 
document: " + xmlDocument.getAbsolutePath());
+                                }
+                                parser.parseElements(xmlDocument, writer, fta, 
tupleIndex);
+                            }
+                        }
+                    } else {
+                        throw new HyracksDataException("Invalid directory 
parameter (" + nodeId + ":"
+                                + collectionDirectory.getAbsolutePath() + ") 
passed to collection.");
+                    }
                 }
                 //else check in HDFS file system
-                else
-                {
-                       HDFSFunctions hdfs = new HDFSFunctions();
-                       FileSystem fs = hdfs.getFileSystem();
-                       if (fs != null)
-                       {
-                               Path directory = new 
Path(collectionModifiedName);
-                               Path xmlDocument;
-                               try {
-                                                       if 
(fs.exists(directory) && fs.isDirectory(directory))
-                                                       {
-                                                               for (int 
tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) {
-                                                               //read 
directory files from HDFS
-                                                               
RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
-                                                           while (it.hasNext())
-                                                           {
-                                                               xmlDocument = 
it.next().getPath();
-                                                               if 
(fs.isFile(xmlDocument))
-                                                               {
-                                                                       if 
(LOGGER.isLoggable(Level.FINE)) {
-                                                                       
LOGGER.fine("Starting to read XML document: " + xmlDocument.getName());
-                                                                   }
-                                                                       
InputStream in = fs.open(xmlDocument).getWrappedStream();
-                                                                       
parser.parseHDFSElements(in, writer, fta, tupleIndex);
-                                                               }
-                                                           }
-                                                               }
-                                                       }
-                                                       else
-                                                       {
-                                                                throw new 
HyracksDataException("Invalid directory parameter (" + nodeId + ":"
-                                                                   + 
collectionDirectory.getAbsolutePath() + ") passed to collection.");
-                                                       }
-                                               } catch (FileNotFoundException 
e) {
-                                                       System.err.println(e);
-                                               } catch (IOException e) {
-                                                       System.err.println(e);
-                                               }
-                       }
+                else {
+                    //get instance of the HDFS filesystem
+                    HDFSFunctions hdfs = new HDFSFunctions();
+                    FileSystem fs = hdfs.getFileSystem();
+                    if (fs != null) {
+                        Path directory = new Path(collectionModifiedName);
+                        Path xmlDocument;
+                        try {
+                            //check if the path exists and is a directory
+                            if (fs.exists(directory) && 
fs.isDirectory(directory)) {
+                                for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
+                                    //read every files in the directory
+                                    RemoteIterator<LocatedFileStatus> it = 
fs.listFiles(directory, true);
+                                    while (it.hasNext()) {
+                                        xmlDocument = it.next().getPath();
+                                        if (fs.isFile(xmlDocument)) {
+                                            if (LOGGER.isLoggable(Level.FINE)) 
{
+                                                LOGGER.fine("Starting to read 
XML document: " + xmlDocument.getName());
+                                            }
+                                            //create an input stream to the 
file currently reading and send it to parser
+                                            InputStream in = 
fs.open(xmlDocument).getWrappedStream();
+                                            parser.parseHDFSElements(in, 
writer, fta, tupleIndex);
+                                        }
+                                    }
+                                }
+                            } else {
+                                throw new HyracksDataException("Invalid HDFS 
directory parameter (" + nodeId + ":"
+                                        + collectionDirectory + ") passed to 
collection.");
+                            }
+                        } catch (FileNotFoundException e) {
+                            System.err.println(e);
+                        } catch (IOException e) {
+                            System.err.println(e);
+                        }
+
+                        //Check for collection with tags
+                        if (true) {
+                            try {
+                                
System.out.println(InetAddress.getLocalHost().getHostName());
+                            } catch (UnknownHostException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/42d74b49/vxquery-server/src/main/resources/conf/cluster.properties
----------------------------------------------------------------------
diff --git a/vxquery-server/src/main/resources/conf/cluster.properties 
b/vxquery-server/src/main/resources/conf/cluster.properties
index fd015d4..8c7ca66 100644
--- a/vxquery-server/src/main/resources/conf/cluster.properties
+++ b/vxquery-server/src/main/resources/conf/cluster.properties
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,3 +52,6 @@ NCJAVA_OPTS="-server -Xmx7G 
-Djava.util.logging.config.file=./vxquery-benchmark/
 # debug option: NCJAVA_OPTS="-Xdebug 
-Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g 
-Djava.util.logging.config.file=logging.properties"
 # Yourkit option: 
-agentpath:/tools/yjp-2014-build-14114/bin/linux-x86-64/libyjpagent.so=port=20001"
 # Yourkit mac option: 
-agentpath:/Applications/YourKit_Java_Profiler.app/bin/mac/libyjpagent.jnilib=sampling
+
+#HDFS configuration directory
+HDFS_CONF=/home/efi/Utilities/hadoop/etc/hadoop

Reply via email to