changes in formatting,comments and final variables
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/5d7d7373 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/5d7d7373 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/5d7d7373 Branch: refs/heads/steven/hdfs Commit: 5d7d737323fa7a9308a02ff7aca46d7fb094bf65 Parents: dbe1afb Author: efikalti <[email protected]> Authored: Thu Aug 20 11:38:02 2015 +0300 Committer: efikalti <[email protected]> Committed: Thu Aug 20 11:38:02 2015 +0300 ---------------------------------------------------------------------- pom.xml | 39 ++-- vxquery-core/pom.xml | 26 ++- .../vxquery/functions/builtin-functions.xml | 2 +- .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 78 +++---- .../hdfs2/XmlCollectionByTagInputFormat.java | 221 ------------------- .../hdfs2/XmlCollectionWithTagInputFormat.java | 217 ++++++++++++++++++ .../metadata/VXQueryCollectionDataSource.java | 12 +- .../VXQueryCollectionOperatorDescriptor.java | 22 +- .../java/org/apache/vxquery/xtest/XTest.java | 3 +- .../org/apache/vxquery/xtest/XTestOptions.java | 2 +- 10 files changed, 305 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c1c1447..897a0c4 100644 --- a/pom.xml +++ b/pom.xml @@ -1,13 +1,19 @@ -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -285,17 +291,6 @@ </dependency> <dependency> - <groupId>edu.uci.ics.hyracks</groupId> - <artifactId>hyracks-hdfs-core</artifactId> - <version>${hyracks.version}</version> - </dependency> - <dependency> - <groupId>edu.uci.ics.hyracks</groupId> - <artifactId>hyracks-hdfs-2.x</artifactId> - <version>${hyracks.version}</version> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.0</version> @@ -476,7 +471,7 @@ </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> </plugin> </reportPlugins> </configuration> <executions> <execution> <id>attach-site</id> <phase>prepare-package</phase> <goals> <goal>jar</goal> </goals> </execution> - </executions> </plugin> --> + </executions>< /plugin> --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-site-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/pom.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/pom.xml b/vxquery-core/pom.xml index 7fd8f0e..3e08a45 100644 --- a/vxquery-core/pom.xml +++ b/vxquery-core/pom.xml @@ -1,13 +1,19 @@ -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml index 4e5b6de..2b2be80 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml @@ -128,7 +128,7 @@ <!-- Collection operator is added during the rewrite rules phase. --> </function> - <!-- fn:collectionwithtag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> + <!-- fn:collection-with-tag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> <function name="fn:collectionwithtag"> <param name="arg1" type="xs:string?"/> <param name="arg2" type="xs:string?"/> http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java index 60682d7..7604e48 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -69,6 +69,9 @@ public class HDFSFunctions { private ArrayList<ArrayList<String>> nodes; private File nodeXMLfile; private HashMap<Integer, String> schedule; + private final String TEMP = "java.io.tmpdir"; + private final String dfs_path = "vxquery_splits_schedule.txt"; + private final String filepath = System.getProperty(TEMP) + "splits_schedule.txt"; /** * Create the configuration and add the paths for core-site and hdfs-site as resources. @@ -92,8 +95,7 @@ public class HDFSFunctions { job = new Job(conf, "Read from HDFS"); Path input = new Path(filepath); FileInputFormat.addInputPath(job, input); - //TODO change input format class to XMLInputFormatClassOneBufferSolution - job.setInputFormatClass(XmlCollectionByTagInputFormat.class); + job.setInputFormatClass(XmlCollectionWithTagInputFormat.class); inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration()); splits = inputFormat.getSplits(job); } catch (IOException e) { @@ -178,8 +180,7 @@ public class HDFSFunctions { } // get the property value for HDFS_CONF this.conf_path = prop.getProperty("HDFS_CONF"); - if (this.conf_path == null) - { + if (this.conf_path == null) { this.conf_path = System.getenv("HADOOP_CONF_DIR"); return this.conf_path != null; } @@ -265,7 +266,7 @@ public class HDFSFunctions { return splits_map; } - public void scheduleSplits() throws IOException { + public void scheduleSplits() throws IOException, ParserConfigurationException, SAXException { schedule = new HashMap<Integer, String>(); ArrayList<String> empty = new ArrayList<String>(); @@ -311,67 +312,55 @@ public class HDFSFunctions { } } } - // TODO remove from here this is for debugging only - for (int s : schedule.keySet()) { - System.out.println("split: " + s + ", host: " + schedule.get(s)); - } } /** * Read the hostname and the ip address of every node from the xml cluster configuration file. * Save the information inside an ArrayList. + * + * @throws ParserConfigurationException + * @throws IOException + * @throws SAXException */ - public void readNodesFromXML() { + public void readNodesFromXML() throws ParserConfigurationException, SAXException, IOException { DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder; - try { - dBuilder = dbFactory.newDocumentBuilder(); - Document doc = dBuilder.parse(nodeXMLfile); - doc.getDocumentElement().normalize(); + dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.parse(nodeXMLfile); + doc.getDocumentElement().normalize(); - nodes = new ArrayList<ArrayList<String>>(); - NodeList nList = doc.getElementsByTagName("node"); + nodes = new ArrayList<ArrayList<String>>(); + NodeList nList = doc.getElementsByTagName("node"); - for (int temp = 0; temp < nList.getLength(); temp++) { + for (int temp = 0; temp < nList.getLength(); temp++) { - Node nNode = nList.item(temp); + Node nNode = nList.item(temp); - if (nNode.getNodeType() == Node.ELEMENT_NODE) { + if (nNode.getNodeType() == Node.ELEMENT_NODE) { - Element eElement = (Element) nNode; - ArrayList<String> info = new ArrayList<String>(); - info.add(eElement.getElementsByTagName("id").item(0).getTextContent()); - info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent()); - nodes.add(info); - } + Element eElement = (Element) nNode; + ArrayList<String> info = new ArrayList<String>(); + info.add(eElement.getElementsByTagName("id").item(0).getTextContent()); + info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent()); + nodes.add(info); } - } catch (ParserConfigurationException e) { - System.err.println(e); - } catch (SAXException e) { - System.err.println(e); - } catch (IOException e) { - System.err.println(e); } + } /** * Writes the schedule to a temporary file, then uploads the file to the HDFS. + * + * @throws UnsupportedEncodingException + * @throws FileNotFoundException */ - public void addScheduleToDistributedCache() { - String filepath = "/tmp/splits_schedule.txt"; - String dfs_path = "vxquery_splits_schedule.txt"; + public void addScheduleToDistributedCache() throws FileNotFoundException, UnsupportedEncodingException { PrintWriter writer; - try { - writer = new PrintWriter(filepath, "UTF-8"); - for (int split : this.schedule.keySet()) { - writer.write(split + "," + this.schedule.get(split)); - } - writer.close(); - } catch (FileNotFoundException e) { - System.err.println(e); - } catch (UnsupportedEncodingException e) { - System.err.println(e); + writer = new PrintWriter(filepath, "UTF-8"); + for (int split : this.schedule.keySet()) { + writer.write(split + "," + this.schedule.get(split)); } + writer.close(); // Add file to HDFS this.put(filepath, dfs_path); } @@ -407,7 +396,6 @@ public class HDFSFunctions { } } } catch (HyracksDataException e1) { - // TODO Auto-generated catch block e1.printStackTrace(); } return null; http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java deleted file mode 100644 index fa2e4f9..0000000 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.vxquery.hdfs2; - -import com.google.common.io.Closeables; -import org.apache.commons.io.Charsets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -import java.io.IOException; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; - -/** - * Reads records that are delimited by a specific begin/end tag. - */ -public class XmlCollectionByTagInputFormat extends TextInputFormat { - - public static String STARTING_TAG; - public static String ENDING_TAG; - - @Override - public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { - try { - STARTING_TAG = context.getConfiguration().get("start_tag"); - ENDING_TAG = context.getConfiguration().get("end_tag"); - return new XmlRecordReader((FileSplit) split, context.getConfiguration()); - } catch (IOException ioe) { - return null; - } - } - - /** - * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified - * by the end tag - */ - public static class XmlRecordReader extends RecordReader<LongWritable, Text> { - - private final byte[] end_tag; - private final byte[] start_tag; - private final long start; - private final long end; - private int current_block = 0; - private final FSDataInputStream fsin; - private final DataOutputBuffer buffer = new DataOutputBuffer(); - private LongWritable currentKey; - private Text currentValue; - BlockLocation[] blocks; - public static byte[] nl = "\n".getBytes(); - - public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { - end_tag = ENDING_TAG.getBytes(Charsets.UTF_8); - start_tag = STARTING_TAG.getBytes(Charsets.UTF_8); - - // open the file and seek to the start of the split - start = split.getStart(); - // set the end of the file - end = start + split.getLength(); - Path file = split.getPath(); - FileSystem fs = file.getFileSystem(conf); - FileStatus fStatus = fs.getFileStatus(file); - blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen()); - // seek the start of file - fsin = fs.open(split.getPath()); - fsin.seek(start); - } - - /** - * Get next block item - * - * @param key - * @param value - * @return - * @throws IOException - */ - private boolean next(LongWritable key, Text value) throws IOException { - // current_block = nextBlock(); - //if (fsin.getPos() < end && current_block < blocks.length) - if (fsin.getPos() < end) { - try { - if (readBlock(true)) { - key.set(fsin.getPos()); - value.set(buffer.getData(), 0, buffer.getLength()); - return true; - } - } finally { - buffer.reset(); - } - } - return false; - } - - @Override - public void close() throws IOException { - Closeables.close(fsin, true); - } - - @Override - public float getProgress() throws IOException { - return (fsin.getPos() - start) / (float) (end - start); - } - - /** - * Read the block from start till end and after that until you find a closing tag - * - * @param withinBlock - * @return - * @throws IOException - */ - private boolean readBlock(boolean withinBlock) throws IOException { - boolean read = false; - - while (true) { - if (fsin.getPos() < end) { - if (readUntilMatch(start_tag, false)) { - buffer.write(start_tag); - readUntilMatch(end_tag, true); - //buffer.write(this.nl); - read = true; - } - } else { - return read; - } - } - } - - /** - * Read from block(s) until you reach the end of file or find a matching bytes with match[] - * - * @param match - * @param withinBlock - * @return - * @throws IOException - */ - private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { - int i = 0; - while (true) { - int b = fsin.read(); - // end of file: - if (b == -1) { - return false; - } - // save to buffer: - if (withinBlock) { - buffer.write(b); - } - - // check if we're matching: - if (b == match[i]) { - i++; - if (i >= match.length) { - return true; - } - } else { - i = 0; - } - // see if we've passed the stop point: - if (!withinBlock && i == 0 && fsin.getPos() >= end) { - return false; - } - } - } - - private int nextBlock() throws IOException { - long pos = fsin.getPos(); - long block_length; - for (int i = 0; i < blocks.length; i++) { - block_length = blocks[i].getOffset() + blocks[i].getLength(); - if (pos == block_length) { - return i + 1; - } - } - return 0; - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return currentKey; - } - - @Override - public Text getCurrentValue() throws IOException, InterruptedException { - return currentValue; - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - currentKey = new LongWritable(); - currentValue = new Text(); - return next(currentKey, currentValue); - } - } -} http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java new file mode 100644 index 0000000..1d053b6 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionWithTagInputFormat.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.hdfs2; + +import com.google.common.io.Closeables; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import java.io.IOException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; + +/** + * Reads records that are delimited by a specific begin/end tag. + */ +public class XmlCollectionWithTagInputFormat extends TextInputFormat { + + public static String STARTING_TAG; + public static String ENDING_TAG; + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { + try { + STARTING_TAG = context.getConfiguration().get("start_tag"); + ENDING_TAG = context.getConfiguration().get("end_tag"); + return new XmlRecordReader((FileSplit) split, context.getConfiguration()); + } catch (IOException ioe) { + return null; + } + } + + /** + * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified + * by the end tag + */ + public static class XmlRecordReader extends RecordReader<LongWritable, Text> { + + private final byte[] end_tag; + private final byte[] start_tag; + private final long start; + private final long end; + private final FSDataInputStream fsin; + private final DataOutputBuffer buffer = new DataOutputBuffer(); + private LongWritable currentKey; + private Text currentValue; + BlockLocation[] blocks; + public static byte[] nl = "\n".getBytes(); + + public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { + end_tag = ENDING_TAG.getBytes(Charsets.UTF_8); + start_tag = STARTING_TAG.getBytes(Charsets.UTF_8); + + // open the file and seek to the start of the split + start = split.getStart(); + // set the end of the file + end = start + split.getLength(); + Path file = split.getPath(); + FileSystem fs = file.getFileSystem(conf); + FileStatus fStatus = fs.getFileStatus(file); + blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen()); + // seek the start of file + fsin = fs.open(split.getPath()); + fsin.seek(start); + } + + /** + * Get next block item + * + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean next(LongWritable key, Text value) throws IOException { + if (fsin.getPos() < end) { + try { + if (readBlock(true)) { + key.set(fsin.getPos()); + value.set(buffer.getData(), 0, buffer.getLength()); + return true; + } + } finally { + buffer.reset(); + } + } + return false; + } + + @Override + public void close() throws IOException { + Closeables.close(fsin, true); + } + + @Override + public float getProgress() throws IOException { + return (fsin.getPos() - start) / (float) (end - start); + } + + /** + * Read the block from start till end and after that until you find a closing tag + * + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readBlock(boolean withinBlock) throws IOException { + boolean read = false; + + while (true) { + if (fsin.getPos() < end) { + if (readUntilMatch(start_tag, false)) { + buffer.write(start_tag); + readUntilMatch(end_tag, true); + read = true; + } + } else { + return read; + } + } + } + + /** + * Read from block(s) until you reach the end of file or find a matching bytes with match[] + * + * @param match + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { + int i = 0; + while (true) { + int b = fsin.read(); + // end of file: + if (b == -1) { + return false; + } + // save to buffer: + if (withinBlock) { + buffer.write(b); + } + + // check if we're matching: + if (b == match[i]) { + i++; + if (i >= match.length) { + return true; + } + } else { + i = 0; + } + // see if we've passed the stop point: + if (!withinBlock && i == 0 && fsin.getPos() >= end) { + return false; + } + } + } + + private int nextBlock() throws IOException { + long pos = fsin.getPos(); + long block_length; + for (int i = 0; i < blocks.length; i++) { + block_length = blocks[i].getOffset() + blocks[i].getLength(); + if (pos == block_length) { + return i + 1; + } + } + return 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + currentKey = new LongWritable(); + currentValue = new Text(); + return next(currentKey, currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index decff1c..1044565 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -79,7 +79,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { public String[] getPartitions() { return collectionPartitions; } - + public void setPartitions(String[] collectionPartitions) { this.collectionPartitions = collectionPartitions; } @@ -87,14 +87,12 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { public int getPartitionCount() { return collectionPartitions.length; } - - public String getTag() - { + + public String getTag() { return this.tag; } - - public void setTag(String tag) - { + + public void setTag(String tag) { this.tag = tag; } http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 9bdf34c..0231154 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import javax.xml.parsers.ParserConfigurationException; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.lang.StringUtils; @@ -48,6 +50,7 @@ import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; +import org.xml.sax.SAXException; import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; @@ -118,7 +121,6 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO File collectionDirectory = new File(collectionModifiedName); //check if directory is in the local file system if (collectionDirectory.exists()) { - System.out.println("local"); // Go through each tuple. if (collectionDirectory.isDirectory()) { for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { @@ -138,11 +140,11 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } } else { - collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); // Else check in HDFS file system // Get instance of the HDFS filesystem FileSystem fs = hdfs.getFileSystem(); if (fs != null) { + collectionModifiedName = collectionModifiedName.replaceAll("hdfs:/", ""); Path directory = new Path(collectionModifiedName); Path xmlDocument; if (tag != null) { @@ -201,7 +203,11 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } catch (IOException e) { - System.err.println(e); + e.printStackTrace(); + } catch (ParserConfigurationException e1) { + e1.printStackTrace(); + } catch (SAXException e1) { + e1.printStackTrace(); } } else { try { @@ -233,11 +239,11 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO System.err.println(e); } } - } - try { - fs.close(); - } catch (IOException e) { - System.err.println(e); + try { + fs.close(); + } catch (IOException e) { + System.err.println(e); + } } } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java index cd3d791..0e2a6e0 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java @@ -80,7 +80,6 @@ public class XTest { trf.registerReporters(reporters); TestCaseFactory tcf = new TestCaseFactory(trf, eSvc, opts); count = tcf.process(); - } synchronized void waitForCompletion() throws InterruptedException { @@ -110,4 +109,4 @@ public class XTest { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/vxquery/blob/5d7d7373/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java ---------------------------------------------------------------------- diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java index 42845a1..80fdd65 100644 --- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java +++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTestOptions.java @@ -76,4 +76,4 @@ public class XTestOptions { @Option(name = "-showresult", usage = "Show query result.") boolean showResult; - } +} \ No newline at end of file
