Collection with tag rule and changes
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/924a01b4 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/924a01b4 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/924a01b4 Branch: refs/heads/steven/hdfs Commit: 924a01b44e828b0ce931de927be90421e1c9829a Parents: b33b194 Author: efikalti <[email protected]> Authored: Tue Jul 28 14:57:07 2015 +0300 Committer: efikalti <[email protected]> Committed: Tue Jul 28 14:57:07 2015 +0300 ---------------------------------------------------------------------- .../compiler/rewriter/RewriteRuleset.java | 2 + .../rewriter/rules/AbstractCollectionRule.java | 85 +++++ .../rewriter/rules/CollectionWithTagRule.java | 65 ++++ .../vxquery/functions/builtin-functions.xml | 8 + .../org/apache/vxquery/hdfs2/HDFSFunctions.java | 339 +++++++++++++++++-- .../hdfs2/XmlCollectionByTagInputFormat.java | 221 ++++++++++++ .../metadata/VXQueryCollectionDataSource.java | 12 + .../VXQueryCollectionOperatorDescriptor.java | 129 +++++-- 8 files changed, 792 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java index b67402b..1affeed 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/RewriteRuleset.java @@ -19,6 +19,7 @@ package org.apache.vxquery.compiler.rewriter; import java.util.LinkedList; import java.util.List; +import org.apache.vxquery.compiler.rewriter.rules.CollectionWithTagRule; import org.apache.vxquery.compiler.rewriter.rules.ConsolidateAssignAggregateRule; import org.apache.vxquery.compiler.rewriter.rules.ConvertAssignToUnnestRule; import org.apache.vxquery.compiler.rewriter.rules.ConvertFromAlgebricksExpressionsRule; @@ -115,6 +116,7 @@ public class RewriteRuleset { normalization.add(new SetCollectionDataSourceRule()); normalization.add(new IntroduceCollectionRule()); normalization.add(new RemoveUnusedAssignAndAggregateRule()); + normalization.add(new CollectionWithTagRule()); // Adds child steps to the data source scan. // TODO Replace consolidate with a new child function that takes multiple paths. http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java index 717914a..488602f 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/AbstractCollectionRule.java @@ -125,6 +125,91 @@ public abstract class AbstractCollectionRule implements IAlgebraicRewriteRule { return collectionName; } + protected String[] getCollectionWithTagName(Mutable<ILogicalOperator> opRef) throws AlgebricksException { + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) { + return null; + } + UnnestOperator unnest = (UnnestOperator) op; + + // Check if assign is for fn:Collection. + AbstractLogicalOperator op2 = (AbstractLogicalOperator) unnest.getInputs().get(0).getValue(); + if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) { + return null; + } + AssignOperator assign = (AssignOperator) op2; + + // Check to see if the expression is a function and fn:Collection. + ILogicalExpression logicalExpression = (ILogicalExpression) assign.getExpressions().get(0).getValue(); + if (logicalExpression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + return null; + } + AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression; + if (!functionCall.getFunctionIdentifier().equals( + BuiltinFunctions.FN_COLLECTIONWITHTAG_2.getFunctionIdentifier())) { + return null; + } + // Get arguments + int size = functionCall.getArguments().size(); + if(size > 0) + { + String args[] = new String[size]; + for (int i=0; i<functionCall.getArguments().size(); i++) + { + args[i] = getArgument(functionCall, opRef, i); + } + return args; + } + return null; + } + + private String getArgument(AbstractFunctionCallExpression functionCall, Mutable<ILogicalOperator> opRef, int pos) + { + VXQueryConstantValue constantValue; + ILogicalExpression logicalExpression2 = (ILogicalExpression) functionCall.getArguments().get(pos).getValue(); + if (logicalExpression2.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + return null; + } + VariableReferenceExpression vre = (VariableReferenceExpression) logicalExpression2; + Mutable<ILogicalOperator> opRef3 = OperatorToolbox.findProducerOf(opRef, vre.getVariableReference()); + + // Get the string assigned to the collection function. + AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue(); + if (op3.getOperatorTag() == LogicalOperatorTag.ASSIGN) { + AssignOperator assign2 = (AssignOperator) op3; + + // Check to see if the expression is a constant expression and type string. + ILogicalExpression logicalExpression3 = (ILogicalExpression) assign2.getExpressions().get(0).getValue(); + if (logicalExpression3.getExpressionTag() != LogicalExpressionTag.CONSTANT) { + return null; + } + ConstantExpression constantExpression = (ConstantExpression) logicalExpression3; + constantValue = (VXQueryConstantValue) constantExpression.getValue(); + if (constantValue.getType() != SequenceType.create(BuiltinTypeRegistry.XS_STRING, Quantifier.QUANT_ONE)) { + return null; + } + } else { + return null; + } + String args[] = new String[2]; + // Constant value is now in a TaggedValuePointable. Convert the value into a java String. + tvp.set(constantValue.getValue(), 0, constantValue.getValue().length); + String arg = null; + if (tvp.getTag() == ValueTag.XS_STRING_TAG) { + tvp.getValue(stringp); + try { + bbis.setByteBuffer( + ByteBuffer.wrap(Arrays.copyOfRange(stringp.getByteArray(), stringp.getStartOffset(), + stringp.getLength() + stringp.getStartOffset())), 0); + arg = di.readUTF(); + return arg; + } catch (IOException e) { + e.printStackTrace(); + } + } + return null; + } + @Override public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { return false; http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java new file mode 100644 index 0000000..2e730ec --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/CollectionWithTagRule.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.compiler.rewriter.rules; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.vxquery.compiler.rewriter.VXQueryOptimizationContext; +import org.apache.vxquery.metadata.VXQueryCollectionDataSource; +import org.apache.vxquery.types.AnyItemType; +import org.apache.vxquery.types.Quantifier; +import org.apache.vxquery.types.SequenceType; + +import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; + +public class CollectionWithTagRule extends AbstractCollectionRule { + + @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + VXQueryOptimizationContext vxqueryContext = (VXQueryOptimizationContext) context; + String args[] = getCollectionWithTagName(opRef); + + if (args != null) { + // Build the new operator and update the query plan. + int collectionId = vxqueryContext.newCollectionId(); + VXQueryCollectionDataSource ds = VXQueryCollectionDataSource.create(collectionId, args[0], + SequenceType.create(AnyItemType.INSTANCE, Quantifier.QUANT_STAR)); + if (ds != null) { + ds.setTotalDataSources(vxqueryContext.getTotalDataSources()); + ds.setTag(args[1]); + // Known to be true because of collection name. + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + UnnestOperator unnest = (UnnestOperator) op; + Mutable<ILogicalOperator> opRef2 = unnest.getInputs().get(0); + AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue(); + AssignOperator assign = (AssignOperator) op2; + + DataSourceScanOperator opNew = new DataSourceScanOperator(assign.getVariables(), ds); + opNew.getInputs().addAll(assign.getInputs()); + opRef2.setValue(opNew); + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml index 38f03a4..4e5b6de 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml +++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-functions.xml @@ -128,6 +128,14 @@ <!-- Collection operator is added during the rewrite rules phase. --> </function> + <!-- fn:collectionwithtag($arg1 as xs:string?, $arg2 as xs:string?) as node()* --> + <function name="fn:collectionwithtag"> + <param name="arg1" type="xs:string?"/> + <param name="arg2" type="xs:string?"/> + <return type="node()*"/> + <!-- CollectionWithTag operator is added during the rewrite rules phase. --> + </function> + <!-- fn:compare($comparand1 as xs:string?, $comparand2 as xs:string?) as xs:integer? --> <function name="fn:compare"> <param name="comparand1" type="xs:string?"/> http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java index 50037ea..e13adf5 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/HDFSFunctions.java @@ -16,42 +16,92 @@ */ package org.apache.vxquery.hdfs2; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Properties; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.hdfs.ContextFactory; +import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory; + +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class HDFSFunctions { private Configuration conf; private FileSystem fs; private String conf_path; + private Job job; + private InputFormat inputFormat; + private List<InputSplit> splits; + private ArrayList<ArrayList<String>> nodes; + private File nodeXMLfile; + private HashMap<Integer, String> schedule; /** * Create the configuration and add the paths for core-site and hdfs-site as resources. * Initialize an instance of HDFS FileSystem for this configuration. - * - * @param hadoop_conf_filepath */ public HDFSFunctions() { - if (locateConf()) { - this.conf = new Configuration(); + this.conf = new Configuration(); + } - conf.addResource(new Path(this.conf_path + "/core-site.xml")); - conf.addResource(new Path(this.conf_path + "/hdfs-site.xml")); - try { - fs = FileSystem.get(conf); - } catch (IOException ex) { - System.err.println(ex); - } - } else { - System.err.println("Could not locate hdfs configuarion folder."); + /** + * Create the needed objects for reading the splits of the filepath given as argument. + * This method should run before the scheduleSplits method. + * + * @param filepath + */ + @SuppressWarnings({ "deprecation", "unchecked" }) + public void setJob(String filepath, String tag) { + try { + conf.set("start_tag", "<" + tag + ">"); + conf.set("end_tag", "</" + tag + ">"); + job = new Job(conf, "Read from HDFS"); + Path input = new Path(filepath); + FileInputFormat.addInputPath(job, input); + //TODO change input format class to XMLInputFormatClassOneBufferSolution + job.setInputFormatClass(XmlCollectionByTagInputFormat.class); + inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration()); + splits = inputFormat.getSplits(job); + } catch (IOException e) { + System.err.println(e); + } catch (ClassNotFoundException e) { + System.err.println(e); + } catch (InterruptedException e) { + System.err.println(e); } } @@ -71,15 +121,11 @@ public class HDFSFunctions { } catch (IOException ex) { System.err.println(ex); } - //Search every file and folder in the home directory - if (searchInDirectory(fs.getHomeDirectory(), filename) != null) { - return true; - } - return false; + return searchInDirectory(fs.getHomeDirectory(), filename) != null; } /** - * Searches the given directory and subdirectories for the file. + * Searches the given directory for the file. * * @param directory * to search @@ -88,7 +134,7 @@ public class HDFSFunctions { * @return path if file exists in this directory.else return null. */ public Path searchInDirectory(Path directory, String filename) { - //Search every folder in the directory + //Search the files and folder in this Path to find the one matching the filename. try { RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); String[] parts; @@ -114,9 +160,9 @@ public class HDFSFunctions { */ private boolean locateConf() { - this.conf_path = System.getProperty("HDFS_CONF"); if (this.conf_path == null) { + nodeXMLfile = new File("/home/efi/Projects/vxquery/vxquery-server/src/main/resources/conf/local.xml"); // load properties file Properties prop = new Properties(); String propFilePath = "../vxquery-server/src/main/resources/conf/cluster.properties"; @@ -127,22 +173,17 @@ public class HDFSFunctions { try { prop.load(new FileInputStream(propFilePath)); } catch (FileNotFoundException e1) { - e1.printStackTrace(); } catch (IOException e1) { - e1.printStackTrace(); } } catch (IOException e) { - e.printStackTrace(); + System.err.println(e); } // get the property value for HDFS_CONF this.conf_path = prop.getProperty("HDFS_CONF"); - if (this.conf_path == null) { - return false; - } - return true; + return this.conf_path != null; } - return true; + return false; } /** @@ -161,12 +202,12 @@ public class HDFSFunctions { fs.delete(dest, true); //recursive delete } } catch (IOException e) { - e.printStackTrace(); + System.err.println(e); } try { fs.copyFromLocalFile(path, dest); } catch (IOException e) { - e.printStackTrace(); + System.err.println(e); } } return false; @@ -179,14 +220,242 @@ public class HDFSFunctions { * @return */ public FileSystem getFileSystem() { - if (this.conf_path != null) { - return this.fs; + if (locateConf()) { + conf.addResource(new Path(this.conf_path + "/core-site.xml")); + conf.addResource(new Path(this.conf_path + "/hdfs-site.xml")); + try { + fs = FileSystem.get(conf); + return this.fs; + } catch (IOException ex) { + System.err.println(ex); + } } else { - return null; + System.err.println("Could not locate hdfs configuarion folder."); + } + return null; + } + + /** + * Create a HashMap that has as key the hostname and values the splits that belong to this hostname; + * + * @return + * @throws IOException + */ + public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException { + HashMap<String, ArrayList<Integer>> splits_map = new HashMap<String, ArrayList<Integer>>(); + ArrayList<Integer> temp; + int i = 0; + String hostname; + for (InputSplit s : this.splits) { + SplitLocationInfo info[] = s.getLocationInfo(); + hostname = info[0].getLocation(); + if (splits_map.containsKey(hostname)) { + temp = splits_map.get(hostname); + temp.add(i); + } else { + temp = new ArrayList<Integer>(); + temp.add(i); + splits_map.put(hostname, temp); + } + i++; + } + + return splits_map; + } + + public void scheduleSplits() throws IOException { + + schedule = new HashMap<Integer, String>(); + ArrayList<String> empty = new ArrayList<String>(); + HashMap<String, ArrayList<Integer>> splits_map = this.getLocationsOfSplits(); + readNodesFromXML(); + int count = this.splits.size(); + + ArrayList<Integer> splits; + String node; + for (ArrayList<String> info : this.nodes) { + node = info.get(0); + if (splits_map.containsKey(node)) { + splits = splits_map.get(node); + for (Integer split : splits) { + schedule.put(split, node); + count--; + } + splits_map.remove(node); + } else { + empty.add(node); + } + } + + //Check if every split got assigned to a node + if (count != 0) { + ArrayList<Integer> remaining = new ArrayList<Integer>(); + // Find remaining splits + for (InputSplit s : this.splits) { + int i = 0; + if (!schedule.containsKey(i)) { + remaining.add(i); + } + } + + if (empty.size() != 0) { + int node_number = 0; + for (int split : remaining) { + if (node_number == empty.size()) { + node_number = 0; + } + schedule.put(split, empty.get(node_number)); + node_number++; + } + } + } + // TODO remove from here this is for debugging only + for (int s : schedule.keySet()) { + System.out.println("split: " + s + ", host: " + schedule.get(s)); + } + } + + /** + * Read the hostname and the ip address of every node from the xml cluster configuration file. + * Save the information inside an ArrayList. + */ + public void readNodesFromXML() { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder; + try { + dBuilder = dbFactory.newDocumentBuilder(); + Document doc = dBuilder.parse(nodeXMLfile); + doc.getDocumentElement().normalize(); + + nodes = new ArrayList<ArrayList<String>>(); + NodeList nList = doc.getElementsByTagName("node"); + + for (int temp = 0; temp < nList.getLength(); temp++) { + + Node nNode = nList.item(temp); + + if (nNode.getNodeType() == Node.ELEMENT_NODE) { + + Element eElement = (Element) nNode; + ArrayList<String> info = new ArrayList<String>(); + info.add(eElement.getElementsByTagName("id").item(0).getTextContent()); + info.add(eElement.getElementsByTagName("cluster_ip").item(0).getTextContent()); + nodes.add(info); + } + } + } catch (ParserConfigurationException e) { + System.err.println(e); + } catch (SAXException e) { + System.err.println(e); + } catch (IOException e) { + System.err.println(e); + } + } + + /** + * Writes the schedule to a temporary file, then uploads the file to the HDFS. + */ + public void addScheduleToDistributedCache() { + String filepath = "/tmp/splits_schedule.txt"; + String dfs_path = "vxquery_splits_schedule.txt"; + PrintWriter writer; + try { + writer = new PrintWriter(filepath, "UTF-8"); + for (int split : this.schedule.keySet()) { + writer.write(split + "," + this.schedule.get(split)); + } + writer.close(); + } catch (FileNotFoundException e) { + System.err.println(e); + } catch (UnsupportedEncodingException e) { + System.err.println(e); + } + // Add file to HDFS + this.put(filepath, dfs_path); + } + + public RecordReader getReader() { + + List<FileSplit> fileSplits = new ArrayList<FileSplit>(); + for (int i = 0; i < splits.size(); i++) { + fileSplits.add((FileSplit) splits.get(i)); } + FileSplitsFactory splitsFactory; + try { + splitsFactory = new FileSplitsFactory(fileSplits); + List<FileSplit> inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + for (int i = 0; i < size; i++) { + /** + * read the split + */ + TaskAttemptContext context; + try { + context = ctxFactory.createContext(job.getConfiguration(), i); + RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + return reader; + } catch (HyracksDataException e) { + System.err.println(e); + } catch (IOException e) { + System.err.println(e); + } catch (InterruptedException e) { + System.err.println(e); + } + } + } catch (HyracksDataException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + return null; } - public void scheduleSplits() { + /** + * @return schedule. + */ + public HashMap<Integer, String> getSchedule() { + return this.schedule; + } + /** + * Return the splits belonging to this node for the existing schedule. + * + * @param node + * @return + */ + public ArrayList<Integer> getScheduleForNode(String node) { + ArrayList<Integer> node_schedule = new ArrayList<Integer>(); + for (int split : this.schedule.keySet()) { + if (node.equals(this.schedule.get(split))) { + node_schedule.add(split); + } + } + return node_schedule; + } + + public List<InputSplit> getSplits() { + return this.splits; + } + + public Job getJob() { + return this.job; + } + + public InputFormat getinputFormat() { + return this.inputFormat; + } + + public Document convertStringToDocument(String xmlStr) { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder; + try { + builder = factory.newDocumentBuilder(); + Document doc = builder.parse(new InputSource(new StringReader(xmlStr))); + return doc; + } catch (Exception e) { + System.err.println(e); + } + return null; } } http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java new file mode 100644 index 0000000..fa2e4f9 --- /dev/null +++ b/vxquery-core/src/main/java/org/apache/vxquery/hdfs2/XmlCollectionByTagInputFormat.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.hdfs2; + +import com.google.common.io.Closeables; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import java.io.IOException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; + +/** + * Reads records that are delimited by a specific begin/end tag. + */ +public class XmlCollectionByTagInputFormat extends TextInputFormat { + + public static String STARTING_TAG; + public static String ENDING_TAG; + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { + try { + STARTING_TAG = context.getConfiguration().get("start_tag"); + ENDING_TAG = context.getConfiguration().get("end_tag"); + return new XmlRecordReader((FileSplit) split, context.getConfiguration()); + } catch (IOException ioe) { + return null; + } + } + + /** + * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified + * by the end tag + */ + public static class XmlRecordReader extends RecordReader<LongWritable, Text> { + + private final byte[] end_tag; + private final byte[] start_tag; + private final long start; + private final long end; + private int current_block = 0; + private final FSDataInputStream fsin; + private final DataOutputBuffer buffer = new DataOutputBuffer(); + private LongWritable currentKey; + private Text currentValue; + BlockLocation[] blocks; + public static byte[] nl = "\n".getBytes(); + + public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { + end_tag = ENDING_TAG.getBytes(Charsets.UTF_8); + start_tag = STARTING_TAG.getBytes(Charsets.UTF_8); + + // open the file and seek to the start of the split + start = split.getStart(); + // set the end of the file + end = start + split.getLength(); + Path file = split.getPath(); + FileSystem fs = file.getFileSystem(conf); + FileStatus fStatus = fs.getFileStatus(file); + blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen()); + // seek the start of file + fsin = fs.open(split.getPath()); + fsin.seek(start); + } + + /** + * Get next block item + * + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean next(LongWritable key, Text value) throws IOException { + // current_block = nextBlock(); + //if (fsin.getPos() < end && current_block < blocks.length) + if (fsin.getPos() < end) { + try { + if (readBlock(true)) { + key.set(fsin.getPos()); + value.set(buffer.getData(), 0, buffer.getLength()); + return true; + } + } finally { + buffer.reset(); + } + } + return false; + } + + @Override + public void close() throws IOException { + Closeables.close(fsin, true); + } + + @Override + public float getProgress() throws IOException { + return (fsin.getPos() - start) / (float) (end - start); + } + + /** + * Read the block from start till end and after that until you find a closing tag + * + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readBlock(boolean withinBlock) throws IOException { + boolean read = false; + + while (true) { + if (fsin.getPos() < end) { + if (readUntilMatch(start_tag, false)) { + buffer.write(start_tag); + readUntilMatch(end_tag, true); + //buffer.write(this.nl); + read = true; + } + } else { + return read; + } + } + } + + /** + * Read from block(s) until you reach the end of file or find a matching bytes with match[] + * + * @param match + * @param withinBlock + * @return + * @throws IOException + */ + private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { + int i = 0; + while (true) { + int b = fsin.read(); + // end of file: + if (b == -1) { + return false; + } + // save to buffer: + if (withinBlock) { + buffer.write(b); + } + + // check if we're matching: + if (b == match[i]) { + i++; + if (i >= match.length) { + return true; + } + } else { + i = 0; + } + // see if we've passed the stop point: + if (!withinBlock && i == 0 && fsin.getPos() >= end) { + return false; + } + } + } + + private int nextBlock() throws IOException { + long pos = fsin.getPos(); + long block_length; + for (int i = 0; i < blocks.length; i++) { + block_length = blocks[i].getOffset() + blocks[i].getLength(); + if (pos == block_length) { + return i + 1; + } + } + return 0; + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + currentKey = new LongWritable(); + currentValue = new Text(); + return next(currentKey, currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java index d17a1a9..decff1c 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionDataSource.java @@ -37,6 +37,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { private String[] collectionPartitions; private final List<Integer> childSeq; private int totalDataSources; + private String tag; private final Object[] types; @@ -60,6 +61,7 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { } }; this.childSeq = new ArrayList<Integer>(); + this.tag = null; } public int getTotalDataSources() { @@ -85,6 +87,16 @@ public class VXQueryCollectionDataSource implements IDataSource<String> { public int getPartitionCount() { return collectionPartitions.length; } + + public String getTag() + { + return this.tag; + } + + public void setTag(String tag) + { + this.tag = tag; + } @Override public String getId() { http://git-wip-us.apache.org/repos/asf/vxquery/blob/924a01b4/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index c945395..b7627d0 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -16,13 +16,15 @@ */ package org.apache.vxquery.metadata; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.logging.Level; @@ -34,6 +36,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.vxquery.context.DynamicContext; import org.apache.vxquery.hdfs2.HDFSFunctions; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; @@ -51,6 +59,8 @@ import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import edu.uci.ics.hyracks.hdfs.ContextFactory; +import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory; public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -59,6 +69,8 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO private String[] collectionPartitions; private List<Integer> childSeq; protected static final Logger LOGGER = Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName()); + private HDFSFunctions hdfs; + private String tag; public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry spec, VXQueryCollectionDataSource ds, RecordDescriptor rDesc) { @@ -68,6 +80,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO totalDataSources = (short) ds.getTotalDataSources(); childSeq = ds.getChildSeq(); recordDescriptors[0] = rDesc; + this.tag = ds.getTag(); } @Override @@ -92,6 +105,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void open() throws HyracksDataException { appender.reset(frame, true); writer.open(); + hdfs = new HDFSFunctions(); } @Override @@ -119,52 +133,99 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO throw new HyracksDataException("Invalid directory parameter (" + nodeId + ":" + collectionDirectory.getAbsolutePath() + ") passed to collection."); } - } - //else check in HDFS file system - else { - //get instance of the HDFS filesystem - HDFSFunctions hdfs = new HDFSFunctions(); + } else { + // Else check in HDFS file system + // Get instance of the HDFS filesystem FileSystem fs = hdfs.getFileSystem(); if (fs != null) { Path directory = new Path(collectionModifiedName); Path xmlDocument; - try { - //check if the path exists and is a directory - if (fs.exists(directory) && fs.isDirectory(directory)) { - for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { - //read every files in the directory - RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); - while (it.hasNext()) { - xmlDocument = it.next().getPath(); - if (fs.isFile(xmlDocument)) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting to read XML document: " + xmlDocument.getName()); - } + if (tag != null) { + String tags[] = tag.split("/"); + String start_tag = "<?xml version=\"1.0\" encoding=\"utf-8\"?>"; + String end_tag = ""; + for (int i = 0; i < tags.length - 1; i++) { + start_tag = start_tag + "<" + tags[i] + ">"; + end_tag = end_tag + "</" + tags[i] + ">"; + } + hdfs.setJob(directory.getName(), tags[tags.length - 1]); + Job job = hdfs.getJob(); + InputFormat inputFormat = hdfs.getinputFormat(); + try { + hdfs.scheduleSplits(); + ArrayList<Integer> schedule = hdfs.getScheduleForNode(InetAddress.getLocalHost() + .getHostName()); + List<InputSplit> splits = hdfs.getSplits(); + List<FileSplit> fileSplits = new ArrayList<FileSplit>(); + for (int i : schedule) { + fileSplits.add((FileSplit) splits.get(i)); + } + FileSplitsFactory splitsFactory = new FileSplitsFactory(fileSplits); + List<FileSplit> inputSplits = splitsFactory.getSplits(); + ContextFactory ctxFactory = new ContextFactory(); + int size = inputSplits.size(); + for (int i = 0; i < size; i++) { + //read split + TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i); + RecordReader reader; + try { + reader = inputFormat.createRecordReader(inputSplits.get(i), context); + reader.initialize(inputSplits.get(i), context); + while (reader.nextKeyValue() == true) { + String value = reader.getCurrentValue().toString(); + String xml = start_tag; + xml = xml + value + end_tag; + System.out.println(xml); //create an input stream to the file currently reading and send it to parser - InputStream in = fs.open(xmlDocument).getWrappedStream(); - parser.parseHDFSElements(in, writer, fta, tupleIndex); + InputStream stream = new ByteArrayInputStream( + xml.getBytes(StandardCharsets.UTF_8)); + parser.parseHDFSElements(stream, writer, fta, i); } + + } catch (InterruptedException e) { + System.err.println(e); } } - } else { - throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" - + collectionDirectory + ") passed to collection."); - } - } catch (FileNotFoundException e) { - System.err.println(e); - } catch (IOException e) { - System.err.println(e); - } - //Check for collection with tags - if (true) { + } catch (IOException e) { + System.err.println(e); + } + } else { try { - System.out.println(InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - e.printStackTrace(); + //check if the path exists and is a directory + if (fs.exists(directory) && fs.isDirectory(directory)) { + for (int tupleIndex = 0; tupleIndex < fta.getTupleCount(); ++tupleIndex) { + //read every file in the directory + RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true); + while (it.hasNext()) { + xmlDocument = it.next().getPath(); + if (fs.isFile(xmlDocument)) { + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine("Starting to read XML document: " + + xmlDocument.getName()); + } + //create an input stream to the file currently reading and send it to parser + InputStream in = fs.open(xmlDocument).getWrappedStream(); + parser.parseHDFSElements(in, writer, fta, tupleIndex); + } + } + } + } else { + throw new HyracksDataException("Invalid HDFS directory parameter (" + nodeId + ":" + + collectionDirectory + ") passed to collection."); + } + } catch (FileNotFoundException e) { + System.err.println(e); + } catch (IOException e) { + System.err.println(e); } } } + try { + fs.close(); + } catch (IOException e) { + System.err.println(e); + } } }
