changes in formatting,comments and final variables

Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/5d7d7373
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/5d7d7373
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/5d7d7373

Branch: refs/heads/steven/hdfs
Commit: 5d7d737323fa7a9308a02ff7aca46d7fb094bf65
Parents: dbe1afb
Author: efikalti <[email protected]>
Authored: Thu Aug 20 11:38:02 2015 +0300
Committer: efikalti <[email protected]>
Committed: Thu Aug 20 11:38:02 2015 +0300

----------------------------------------------------------------------
 pom.xml                                         |  39 ++--
 vxquery-core/pom.xml                            |  26 ++-
 .../vxquery/functions/builtin-functions.xml     |   2 +-
 .../org/apache/vxquery/hdfs2/HDFSFunctions.java |  78 +++----
 .../hdfs2/XmlCollectionByTagInputFormat.java    | 221 -------------------
 .../hdfs2/XmlCollectionWithTagInputFormat.java  | 217 ++++++++++++++++++
 .../metadata/VXQueryCollectionDataSource.java   |  12 +-
 .../VXQueryCollectionOperatorDescriptor.java    |  22 +-
 .../java/org/apache/vxquery/xtest/XTest.java    |   3 +-
 .../org/apache/vxquery/xtest/XTestOptions.java  |   2 +-
 10 files changed, 305 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c1c1447..897a0c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,13 +1,19 @@
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
-    license agreements. See the NOTICE file distributed with this work for 
additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    You under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
@@ -285,17 +291,6 @@
             </dependency>
 
             <dependency>
-                <groupId>edu.uci.ics.hyracks</groupId>
-                <artifactId>hyracks-hdfs-core</artifactId>
-                <version>${hyracks.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>edu.uci.ics.hyracks</groupId>
-                <artifactId>hyracks-hdfs-2.x</artifactId>
-                <version>${hyracks.version}</version>
-            </dependency>
-
-            <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-hdfs</artifactId>
                 <version>2.7.0</version>
@@ -476,7 +471,7 @@
                 </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> 
<artifactId>maven-javadoc-plugin</artifactId> 
                 </plugin> </reportPlugins> </configuration> <executions> 
<execution> <id>attach-site</id> 
                 <phase>prepare-package</phase> <goals> <goal>jar</goal> 
</goals> </execution> 
-                </executions> </plugin> -->
+                </executions>< /plugin> -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-site-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/pom.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml
index 7fd8f0e..3e08a45 100644
--- a/vxquery-core/pom.xml
+++ b/vxquery-core/pom.xml
@@ -1,13 +1,19 @@
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
-    license agreements. See the NOTICE file distributed with this work for 
additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    You under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
index 4e5b6de..2b2be80 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml
@@ -128,7 +128,7 @@
         <!-- Collection operator is added during the rewrite rules phase.  -->
     </function>
     
-    <!-- fn:collectionwithtag($arg1  as xs:string?, $arg2 as xs:string?) as  
node()* -->
+    <!-- fn:collection-with-tag($arg1  as xs:string?, $arg2 as xs:string?) as  
node()* -->
     <function name="fn:collectionwithtag">
         <param name="arg1" type="xs:string?"/>
         <param name="arg2" type="xs:string?"/>

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
index 60682d7..7604e48 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java
@@ -69,6 +69,9 @@ public class HDFSFunctions {
     private ArrayList<ArrayList<String>> nodes;
     private File nodeXMLfile;
     private HashMap<Integer, String> schedule;
+    private final String TEMP = "java.io.tmpdir";
+    private final String dfs_path = "vxquery_splits_schedule.txt";
+    private final String filepath = System.getProperty(TEMP) + 
"splits_schedule.txt";
 
     /**
      * Create the configuration and add the paths for core-site and hdfs-site 
as resources.
@@ -92,8 +95,7 @@ public class HDFSFunctions {
             job = new Job(conf, "Read from HDFS");
             Path input = new Path(filepath);
             FileInputFormat.addInputPath(job, input);
-            //TODO change input format class to 
XMLInputFormatClassOneBufferSolution
-            job.setInputFormatClass(XmlCollectionByTagInputFormat.class);
+            job.setInputFormatClass(XmlCollectionWithTagInputFormat.class);
             inputFormat = 
ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
             splits = inputFormat.getSplits(job);
         } catch (IOException e) {
@@ -178,8 +180,7 @@ public class HDFSFunctions {
             }
             // get the property value for HDFS_CONF
             this.conf_path = prop.getProperty("HDFS_CONF");
-            if (this.conf_path == null)
-            {
+            if (this.conf_path == null) {
                 this.conf_path = System.getenv("HADOOP_CONF_DIR");
                 return this.conf_path != null;
             }
@@ -265,7 +266,7 @@ public class HDFSFunctions {
         return splits_map;
     }
 
-    public void scheduleSplits() throws IOException {
+    public void scheduleSplits() throws IOException, 
ParserConfigurationException, SAXException {
 
         schedule = new HashMap<Integer, String>();
         ArrayList<String> empty = new ArrayList<String>();
@@ -311,67 +312,55 @@ public class HDFSFunctions {
                 }
             }
         }
-        // TODO remove from here this is for debugging only
-        for (int s : schedule.keySet()) {
-            System.out.println("split: " + s + ", host: " + schedule.get(s));
-        }
     }
 
     /**
      * Read the hostname and the ip address of every node from the xml cluster 
configuration file.
      * Save the information inside an ArrayList.
+     * 
+     * @throws ParserConfigurationException
+     * @throws IOException
+     * @throws SAXException
      */
-    public void readNodesFromXML() {
+    public void readNodesFromXML() throws ParserConfigurationException, 
SAXException, IOException {
         DocumentBuilderFactory dbFactory = 
DocumentBuilderFactory.newInstance();
         DocumentBuilder dBuilder;
-        try {
-            dBuilder = dbFactory.newDocumentBuilder();
-            Document doc = dBuilder.parse(nodeXMLfile);
-            doc.getDocumentElement().normalize();
+        dBuilder = dbFactory.newDocumentBuilder();
+        Document doc = dBuilder.parse(nodeXMLfile);
+        doc.getDocumentElement().normalize();
 
-            nodes = new ArrayList<ArrayList<String>>();
-            NodeList nList = doc.getElementsByTagName("node");
+        nodes = new ArrayList<ArrayList<String>>();
+        NodeList nList = doc.getElementsByTagName("node");
 
-            for (int temp = 0; temp < nList.getLength(); temp++) {
+        for (int temp = 0; temp < nList.getLength(); temp++) {
 
-                Node nNode = nList.item(temp);
+            Node nNode = nList.item(temp);
 
-                if (nNode.getNodeType() == Node.ELEMENT_NODE) {
+            if (nNode.getNodeType() == Node.ELEMENT_NODE) {
 
-                    Element eElement = (Element) nNode;
-                    ArrayList<String> info = new ArrayList<String>();
-                    
info.add(eElement.getElementsByTagName("id").item(0).getTextContent());
-                    
info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent());
-                    nodes.add(info);
-                }
+                Element eElement = (Element) nNode;
+                ArrayList<String> info = new ArrayList<String>();
+                
info.add(eElement.getElementsByTagName("id").item(0).getTextContent());
+                
info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent());
+                nodes.add(info);
             }
-        } catch (ParserConfigurationException e) {
-            System.err.println(e);
-        } catch (SAXException e) {
-            System.err.println(e);
-        } catch (IOException e) {
-            System.err.println(e);
         }
+
     }
 
     /**
      * Writes the schedule to a temporary file, then uploads the file to the 
HDFS.
+     * 
+     * @throws UnsupportedEncodingException
+     * @throws FileNotFoundException
      */
-    public void addScheduleToDistributedCache() {
-        String filepath = "/tmp/splits_schedule.txt";
-        String dfs_path = "vxquery_splits_schedule.txt";
+    public void addScheduleToDistributedCache() throws FileNotFoundException, 
UnsupportedEncodingException {
         PrintWriter writer;
-        try {
-            writer = new PrintWriter(filepath, "UTF-8");
-            for (int split : this.schedule.keySet()) {
-                writer.write(split + "," + this.schedule.get(split));
-            }
-            writer.close();
-        } catch (FileNotFoundException e) {
-            System.err.println(e);
-        } catch (UnsupportedEncodingException e) {
-            System.err.println(e);
+        writer = new PrintWriter(filepath, "UTF-8");
+        for (int split : this.schedule.keySet()) {
+            writer.write(split + "," + this.schedule.get(split));
         }
+        writer.close();
         // Add file to HDFS
         this.put(filepath, dfs_path);
     }
@@ -407,7 +396,6 @@ public class HDFSFunctions {
                 }
             }
         } catch (HyracksDataException e1) {
-            // TODO Auto-generated catch block
             e1.printStackTrace();
         }
         return null;

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
deleted file mode 100644
index fa2e4f9..0000000
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.vxquery.hdfs2;
-
-import com.google.common.io.Closeables;
-import org.apache.commons.io.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-
-/**
- * Reads records that are delimited by a specific begin/end tag.
- */
-public class XmlCollectionByTagInputFormat extends TextInputFormat {
-
-    public static String STARTING_TAG;
-    public static String ENDING_TAG;
-
-    @Override
-    public RecordReader<LongWritable, Text> createRecordReader(InputSplit 
split, TaskAttemptContext context) {
-        try {
-            STARTING_TAG = context.getConfiguration().get("start_tag");
-            ENDING_TAG = context.getConfiguration().get("end_tag");
-            return new XmlRecordReader((FileSplit) split, 
context.getConfiguration());
-        } catch (IOException ioe) {
-            return null;
-        }
-    }
-
-    /**
-     * XMLRecordReader class to read through a given xml document to output 
xml blocks as records as specified
-     * by the end tag
-     */
-    public static class XmlRecordReader extends RecordReader<LongWritable, 
Text> {
-
-        private final byte[] end_tag;
-        private final byte[] start_tag;
-        private final long start;
-        private final long end;
-        private int current_block = 0;
-        private final FSDataInputStream fsin;
-        private final DataOutputBuffer buffer = new DataOutputBuffer();
-        private LongWritable currentKey;
-        private Text currentValue;
-        BlockLocation[] blocks;
-        public static byte[] nl = "\n".getBytes();
-
-        public XmlRecordReader(FileSplit split, Configuration conf) throws 
IOException {
-            end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
-            start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
-
-            // open the file and seek to the start of the split
-            start = split.getStart();
-            // set the end of the file
-            end = start + split.getLength();
-            Path file = split.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FileStatus fStatus = fs.getFileStatus(file);
-            blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
-            // seek the start of file
-            fsin = fs.open(split.getPath());
-            fsin.seek(start);
-        }
-
-        /**
-         * Get next block item
-         * 
-         * @param key
-         * @param value
-         * @return
-         * @throws IOException
-         */
-        private boolean next(LongWritable key, Text value) throws IOException {
-            // current_block = nextBlock();
-            //if (fsin.getPos() < end && current_block < blocks.length)
-            if (fsin.getPos() < end) {
-                try {
-                    if (readBlock(true)) {
-                        key.set(fsin.getPos());
-                        value.set(buffer.getData(), 0, buffer.getLength());
-                        return true;
-                    }
-                } finally {
-                    buffer.reset();
-                }
-            }
-            return false;
-        }
-
-        @Override
-        public void close() throws IOException {
-            Closeables.close(fsin, true);
-        }
-
-        @Override
-        public float getProgress() throws IOException {
-            return (fsin.getPos() - start) / (float) (end - start);
-        }
-
-        /**
-         * Read the block from start till end and after that until you find a 
closing tag
-         * 
-         * @param withinBlock
-         * @return
-         * @throws IOException
-         */
-        private boolean readBlock(boolean withinBlock) throws IOException {
-            boolean read = false;
-
-            while (true) {
-                if (fsin.getPos() < end) {
-                    if (readUntilMatch(start_tag, false)) {
-                        buffer.write(start_tag);
-                        readUntilMatch(end_tag, true);
-                        //buffer.write(this.nl);
-                        read = true;
-                    }
-                } else {
-                    return read;
-                }
-            }
-        }
-
-        /**
-         * Read from block(s) until you reach the end of file or find a 
matching bytes with match[]
-         * 
-         * @param match
-         * @param withinBlock
-         * @return
-         * @throws IOException
-         */
-        private boolean readUntilMatch(byte[] match, boolean withinBlock) 
throws IOException {
-            int i = 0;
-            while (true) {
-                int b = fsin.read();
-                // end of file:
-                if (b == -1) {
-                    return false;
-                }
-                // save to buffer:
-                if (withinBlock) {
-                    buffer.write(b);
-                }
-
-                // check if we're matching:
-                if (b == match[i]) {
-                    i++;
-                    if (i >= match.length) {
-                        return true;
-                    }
-                } else {
-                    i = 0;
-                }
-                // see if we've passed the stop point:
-                if (!withinBlock && i == 0 && fsin.getPos() >= end) {
-                    return false;
-                }
-            }
-        }
-
-        private int nextBlock() throws IOException {
-            long pos = fsin.getPos();
-            long block_length;
-            for (int i = 0; i < blocks.length; i++) {
-                block_length = blocks[i].getOffset() + blocks[i].getLength();
-                if (pos == block_length) {
-                    return i + 1;
-                }
-            }
-            return 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
-            return currentKey;
-        }
-
-        @Override
-        public Text getCurrentValue() throws IOException, InterruptedException 
{
-            return currentValue;
-        }
-
-        @Override
-        public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException 
{
-            currentKey = new LongWritable();
-            currentValue = new Text();
-            return next(currentKey, currentValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
new file mode 100644
index 0000000..1d053b6
--- /dev/null
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.hdfs2;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlCollectionWithTagInputFormat extends TextInputFormat {
+
+    public static String STARTING_TAG;
+    public static String ENDING_TAG;
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader(InputSplit 
split, TaskAttemptContext context) {
+        try {
+            STARTING_TAG = context.getConfiguration().get("start_tag");
+            ENDING_TAG = context.getConfiguration().get("end_tag");
+            return new XmlRecordReader((FileSplit) split, 
context.getConfiguration());
+        } catch (IOException ioe) {
+            return null;
+        }
+    }
+
+    /**
+     * XMLRecordReader class to read through a given xml document to output 
xml blocks as records as specified
+     * by the end tag
+     */
+    public static class XmlRecordReader extends RecordReader<LongWritable, 
Text> {
+
+        private final byte[] end_tag;
+        private final byte[] start_tag;
+        private final long start;
+        private final long end;
+        private final FSDataInputStream fsin;
+        private final DataOutputBuffer buffer = new DataOutputBuffer();
+        private LongWritable currentKey;
+        private Text currentValue;
+        BlockLocation[] blocks;
+        public static byte[] nl = "\n".getBytes();
+
+        public XmlRecordReader(FileSplit split, Configuration conf) throws 
IOException {
+            end_tag = ENDING_TAG.getBytes(Charsets.UTF_8);
+            start_tag = STARTING_TAG.getBytes(Charsets.UTF_8);
+
+            // open the file and seek to the start of the split
+            start = split.getStart();
+            // set the end of the file
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            FileStatus fStatus = fs.getFileStatus(file);
+            blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
+            // seek the start of file
+            fsin = fs.open(split.getPath());
+            fsin.seek(start);
+        }
+
+        /**
+         * Get next block item
+         * 
+         * @param key
+         * @param value
+         * @return
+         * @throws IOException
+         */
+        private boolean next(LongWritable key, Text value) throws IOException {
+            if (fsin.getPos() < end) {
+                try {
+                    if (readBlock(true)) {
+                        key.set(fsin.getPos());
+                        value.set(buffer.getData(), 0, buffer.getLength());
+                        return true;
+                    }
+                } finally {
+                    buffer.reset();
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+            Closeables.close(fsin, true);
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+            return (fsin.getPos() - start) / (float) (end - start);
+        }
+
+        /**
+         * Read the block from start till end and after that until you find a 
closing tag
+         * 
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readBlock(boolean withinBlock) throws IOException {
+            boolean read = false;
+
+            while (true) {
+                if (fsin.getPos() < end) {
+                    if (readUntilMatch(start_tag, false)) {
+                        buffer.write(start_tag);
+                        readUntilMatch(end_tag, true);
+                        read = true;
+                    }
+                } else {
+                    return read;
+                }
+            }
+        }
+
+        /**
+         * Read from block(s) until you reach the end of file or find a 
matching bytes with match[]
+         * 
+         * @param match
+         * @param withinBlock
+         * @return
+         * @throws IOException
+         */
+        private boolean readUntilMatch(byte[] match, boolean withinBlock) 
throws IOException {
+            int i = 0;
+            while (true) {
+                int b = fsin.read();
+                // end of file:
+                if (b == -1) {
+                    return false;
+                }
+                // save to buffer:
+                if (withinBlock) {
+                    buffer.write(b);
+                }
+
+                // check if we're matching:
+                if (b == match[i]) {
+                    i++;
+                    if (i >= match.length) {
+                        return true;
+                    }
+                } else {
+                    i = 0;
+                }
+                // see if we've passed the stop point:
+                if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+                    return false;
+                }
+            }
+        }
+
+        private int nextBlock() throws IOException {
+            long pos = fsin.getPos();
+            long block_length;
+            for (int i = 0; i < blocks.length; i++) {
+                block_length = blocks[i].getOffset() + blocks[i].getLength();
+                if (pos == block_length) {
+                    return i + 1;
+                }
+            }
+            return 0;
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
+            return currentKey;
+        }
+
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException 
{
+            return currentValue;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            currentKey = new LongWritable();
+            currentValue = new Text();
+            return next(currentKey, currentValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
index decff1c..1044565 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java
@@ -79,7 +79,7 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     public String[] getPartitions() {
         return collectionPartitions;
     }
-    
+
     public void setPartitions(String[] collectionPartitions) {
         this.collectionPartitions = collectionPartitions;
     }
@@ -87,14 +87,12 @@ public class VXQueryCollectionDataSource implements 
IDataSource<String> {
     public int getPartitionCount() {
         return collectionPartitions.length;
     }
-    
-    public String getTag()
-    {
+
+    public String getTag() {
         return this.tag;
     }
-    
-    public void setTag(String tag)
-    {
+
+    public void setTag(String tag) {
         this.tag = tag;
     }
 

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index 9bdf34c..0231154 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.commons.lang.StringUtils;
@@ -48,6 +50,7 @@ import org.apache.vxquery.hdfs2.HDFSFunctions;
 import org.apache.vxquery.xmlparser.ITreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.TreeNodeIdProvider;
 import org.apache.vxquery.xmlparser.XMLParser;
+import org.xml.sax.SAXException;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -118,7 +121,6 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                     File collectionDirectory = new 
File(collectionModifiedName);
                     //check if directory is in the local file system
                     if (collectionDirectory.exists()) {
-                        System.out.println("local");
                         // Go through each tuple.
                         if (collectionDirectory.isDirectory()) {
                             for (int tupleIndex = 0; tupleIndex < 
fta.getTupleCount(); ++tupleIndex) {
@@ -138,11 +140,11 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                         }
                     }
                 } else {
-                    collectionModifiedName = 
collectionModifiedName.replaceAll("hdfs:/", "");
                     // Else check in HDFS file system
                     // Get instance of the HDFS filesystem
                     FileSystem fs = hdfs.getFileSystem();
                     if (fs != null) {
+                        collectionModifiedName = 
collectionModifiedName.replaceAll("hdfs:/", "");
                         Path directory = new Path(collectionModifiedName);
                         Path xmlDocument;
                         if (tag != null) {
@@ -201,7 +203,11 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                                 }
 
                             } catch (IOException e) {
-                                System.err.println(e);
+                                e.printStackTrace();
+                            } catch (ParserConfigurationException e1) {
+                                e1.printStackTrace();
+                            } catch (SAXException e1) {
+                                e1.printStackTrace();
                             }
                         } else {
                             try {
@@ -233,11 +239,11 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                                 System.err.println(e);
                             }
                         }
-                    }
-                    try {
-                        fs.close();
-                    } catch (IOException e) {
-                        System.err.println(e);
+                        try {
+                            fs.close();
+                        } catch (IOException e) {
+                            System.err.println(e);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
----------------------------------------------------------------------
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java 
b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
index cd3d791..0e2a6e0 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
@@ -80,7 +80,6 @@ public class XTest {
         trf.registerReporters(reporters);
         TestCaseFactory tcf = new TestCaseFactory(trf, eSvc, opts);
         count = tcf.process();
-
     }
 
     synchronized void waitForCompletion() throws InterruptedException {
@@ -110,4 +109,4 @@ public class XTest {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
----------------------------------------------------------------------
diff --git 
a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java 
b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
index 42845a1..80fdd65 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java
@@ -76,4 +76,4 @@ public class XTestOptions {
 
     @Option(name = "-showresult", usage = "Show query result.")
     boolean showResult;
-    }
+}
\ No newline at end of file

Reply via email to