Repository: incubator-pirk Updated Branches: refs/heads/master 89052a78a -> 7f260e03f
[PIRK-19] Make DataSchema/QuerySchema Agnostic of Persistent Representation -- closes apache/incubator-pirk#28 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/c557fffe Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/c557fffe Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/c557fffe Branch: refs/heads/master Commit: c557fffe76bb4d144fa67b7071d392566115f282 Parents: 89052a7 Author: tellison <[email protected]> Authored: Thu Jul 28 14:56:55 2016 -0400 Committer: eawilliams <[email protected]> Committed: Thu Jul 28 14:56:55 2016 -0400 ---------------------------------------------------------------------- .../pirk/schema/data/DataSchemaLoader.java | 113 +++++----- .../data/partitioner/DataPartitioner.java | 14 +- .../data/partitioner/IPDataPartitioner.java | 12 +- .../partitioner/ISO8601DatePartitioner.java | 23 +- .../partitioner/PrimitiveTypePartitioner.java | 19 +- .../pirk/schema/query/QuerySchemaLoader.java | 216 ++++++++++++------- .../pirk/schema/query/filter/FilterFactory.java | 15 +- src/main/resources/query-schema.xsd | 81 ++++++- 8 files changed, 321 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index f0cca32..1199cd6 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -30,7 +30,6 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.utils.PIRException; @@ -69,9 +68,9 @@ public class DataSchemaLoader { private static final Logger logger = LoggerFactory.getLogger(DataSchemaLoader.class); - private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, - PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, - PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); + private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>( + Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, + PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); static { @@ -87,51 +86,53 @@ public class DataSchemaLoader } } - /** Kept for compatibility */ + /* Kept for compatibility */ public static void initialize() throws Exception { initialize(false, null); } - /** Kept for compatibility */ + /* Kept for compatibility */ public static void initialize(boolean hdfs, FileSystem fs) throws Exception { String dataSchemas = SystemConfiguration.getProperty("data.schemas", "none"); - if (!dataSchemas.equals("none")) + if (dataSchemas.equals("none")) { - String[] dataSchemaFiles = dataSchemas.split(","); - for (String schemaFile : dataSchemaFiles) - { - logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs); + return; + } - // Parse and load the schema file into a DataSchema object; place in the schemaMap - DataSchemaLoader loader = new DataSchemaLoader(); - InputStream is; - if (hdfs) - { - is = fs.open(new Path(schemaFile)); - logger.info("hdfs: filePath = " + schemaFile.toString()); - } - else - { - is = new FileInputStream(schemaFile); - logger.info("localFS: inputFile = " + schemaFile.toString()); - } + String[] dataSchemaFiles = dataSchemas.split(","); + for (String schemaFile : dataSchemaFiles) + { + logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs); + + // Parse and load the schema file into a DataSchema object; place in the schemaMap + DataSchemaLoader loader = new DataSchemaLoader(); + InputStream is; + if (hdfs) + { + is = fs.open(new Path(schemaFile)); + logger.info("hdfs: filePath = " + schemaFile); + } + else + { + is = new FileInputStream(schemaFile); + logger.info("localFS: inputFile = " + schemaFile); + } - try - { - DataSchema dataSchema = loader.loadSchema(is); - DataSchemaRegistry.put(dataSchema); - } finally - { - is.close(); - } + try + { + DataSchema dataSchema = loader.loadSchema(is); + DataSchemaRegistry.put(dataSchema); + } finally + { + is.close(); } } } - /* - * Default constructor + /** + * Default constructor. */ public DataSchemaLoader() {} @@ -149,18 +150,8 @@ public class DataSchemaLoader */ public DataSchema loadSchema(InputStream stream) throws IOException, PIRException { - // Read in and parse the XML schema file - Document doc; - try - { - DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - doc = dBuilder.parse(stream); - } catch (ParserConfigurationException | SAXException e) - { - throw new PIRException("Data schema parsing error", e); - } - doc.getDocumentElement().normalize(); - logger.info("Root element: " + doc.getDocumentElement().getNodeName()); + // Read the XML schema file. + Document doc = parseXMLDocument(stream); // Extract the schemaName NodeList schemaNameList = doc.getElementsByTagName("schemaName"); @@ -181,19 +172,34 @@ public class DataSchemaLoader Node nNode = nList.item(i); if (nNode.getNodeType() == Node.ELEMENT_NODE) { - parseElementNode((Element) nNode, dataSchema); + extractElementNode((Element) nNode, dataSchema); } } return dataSchema; } - private void parseElementNode(Element eElement, DataSchema schema) throws PIRException + private Document parseXMLDocument(InputStream stream) throws IOException, PIRException + { + Document doc; + try + { + DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + doc = dBuilder.parse(stream); + } catch (ParserConfigurationException | SAXException e) + { + throw new PIRException("Schema parsing error", e); + } + doc.getDocumentElement().normalize(); + logger.info("Root element: " + doc.getDocumentElement().getNodeName()); + + return doc; + } + + private void extractElementNode(Element eElement, DataSchema schema) throws PIRException { // Pull out the element name and type attributes. String name = eElement.getElementsByTagName("name").item(0).getTextContent().trim(); - schema.getTextRep().put(name, new Text(name)); - String type = eElement.getElementsByTagName("type").item(0).getTextContent().trim(); schema.getTypeMap().put(name, type); @@ -249,6 +255,11 @@ public class DataSchemaLoader } } + /* + * Creates a new instance of a class with the given type name. + * + * Throws an exception if the class cannot be instantiated, or it does not implement the required interface. + */ DataPartitioner instantiatePartitioner(String partitionerTypeName) throws PIRException { Object obj; @@ -259,7 +270,7 @@ public class DataSchemaLoader obj = c.newInstance(); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | ClassCastException e) { - throw new PIRException("partitioner = " + partitionerTypeName + " cannot be instantiated or does not implement DataParitioner.", e); + throw new PIRException("partitioner = " + partitionerTypeName + " cannot be instantiated or does not implement DataPartitioner.", e); } return (DataPartitioner) obj; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java index cd1e632..8981dcf 100644 --- a/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java +++ b/src/main/java/org/apache/pirk/schema/data/partitioner/DataPartitioner.java @@ -23,6 +23,8 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.List; +import org.apache.pirk.utils.PIRException; + /** * Interface for data partitioning * <p> @@ -35,32 +37,32 @@ public interface DataPartitioner extends Serializable * <p> * If the Object does not have/need a specific type identifier, use null */ - ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception; + ArrayList<BigInteger> toPartitions(Object object, String type) throws PIRException; /** * Method to reconstruct an Object given an ArrayList of its BigInteger partition elements and its type identifier * <p> * If the Object does not have/need a specific type identifier, use null */ - Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws Exception; + Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws PIRException; /** * Method to return the number of bits of an object with the given type */ - int getBits(String type) throws Exception; + int getBits(String type) throws PIRException; /** * Create partitions for an array of the same type of elements - used when a data value field is an array and we wish to encode these into the return value */ - ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws Exception; + ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws PIRException; /** * Method to get an empty set of partitions by data type - used for padding return array values */ - ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception; + ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException; /** * Method to get the number of partitions of the data object given the type */ - int getNumPartitions(String type) throws Exception; + int getNumPartitions(String type) throws PIRException; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java index 494aba0..6f458e2 100644 --- a/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java +++ b/src/main/java/org/apache/pirk/schema/data/partitioner/IPDataPartitioner.java @@ -34,7 +34,7 @@ public class IPDataPartitioner implements DataPartitioner private static final long serialVersionUID = 1L; @Override - public ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception + public ArrayList<BigInteger> toPartitions(Object object, String type) { ArrayList<BigInteger> parts = new ArrayList<>(); @@ -48,7 +48,7 @@ public class IPDataPartitioner implements DataPartitioner } @Override - public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws Exception + public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) { Object element; @@ -59,13 +59,13 @@ public class IPDataPartitioner implements DataPartitioner } @Override - public int getBits(String type) throws Exception + public int getBits(String type) { return Integer.SIZE; } @Override - public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception + public ArrayList<BigInteger> getPaddedPartitions(String type) { ArrayList<BigInteger> parts = new ArrayList<>(); @@ -80,7 +80,7 @@ public class IPDataPartitioner implements DataPartitioner * Create partitions for an array of the same type of elements - used when a data value field is an array and we wish to encode these into the return value */ @Override - public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws Exception + public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) { ArrayList<BigInteger> parts = new ArrayList<>(); @@ -101,7 +101,7 @@ public class IPDataPartitioner implements DataPartitioner } @Override - public int getNumPartitions(String type) throws Exception + public int getNumPartitions(String type) { return 4; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java index 715bf15..329a083 100644 --- a/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java +++ b/src/main/java/org/apache/pirk/schema/data/partitioner/ISO8601DatePartitioner.java @@ -19,10 +19,12 @@ package org.apache.pirk.schema.data.partitioner; import java.math.BigInteger; +import java.text.ParseException; import java.util.ArrayList; import java.util.List; import org.apache.pirk.utils.ISO8601DateParser; +import org.apache.pirk.utils.PIRException; /** * Partitioner class for ISO8061 dates @@ -41,15 +43,22 @@ public class ISO8601DatePartitioner implements DataPartitioner } @Override - public ArrayList<BigInteger> toPartitions(Object object, String type) throws Exception + public ArrayList<BigInteger> toPartitions(Object object, String type) throws PIRException { - long dateLongFormat = ISO8601DateParser.getLongDate((String) object); + long dateLongFormat; + try + { + dateLongFormat = ISO8601DateParser.getLongDate((String) object); + } catch (ParseException e) + { + throw new PIRException("Unable to parse ISO8601 date " + object, e); + } return ptp.toPartitions(dateLongFormat, PrimitiveTypePartitioner.LONG); } @Override - public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws Exception + public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws PIRException { long dateLongFormat = (long) ptp.fromPartitions(parts, partsIndex, PrimitiveTypePartitioner.LONG); @@ -57,25 +66,25 @@ public class ISO8601DatePartitioner implements DataPartitioner } @Override - public int getBits(String type) throws Exception + public int getBits(String type) { return Long.SIZE; } @Override - public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws Exception + public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws PIRException { return ptp.arrayToPartitions(elementList, PrimitiveTypePartitioner.LONG); } @Override - public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception + public ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException { return ptp.getPaddedPartitions(PrimitiveTypePartitioner.LONG); } @Override - public int getNumPartitions(String type) throws Exception + public int getNumPartitions(String type) throws PIRException { return ptp.getNumPartitions(PrimitiveTypePartitioner.LONG); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java index 3688e81..e0473f6 100644 --- a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java +++ b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.http.util.ByteArrayBuffer; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +109,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * */ @Override - public int getNumPartitions(String type) throws Exception + public int getNumPartitions(String type) throws PIRException { int partitionSize = 8; @@ -140,7 +141,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner numParts = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) / partitionSize; break; default: - throw new Exception("type = " + type + " not recognized!"); + throw new PIRException("type = " + type + " not recognized!"); } return numParts; } @@ -149,7 +150,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Get the bit size of the allowed primitive java types */ @Override - public int getBits(String type) throws Exception + public int getBits(String type) throws PIRException { int bits; switch (type) @@ -179,7 +180,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner bits = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")); break; default: - throw new Exception("type = " + type + " not recognized!"); + throw new PIRException("type = " + type + " not recognized!"); } return bits; } @@ -188,7 +189,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Reconstructs the object from the partitions */ @Override - public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws Exception + public Object fromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type) throws PIRException { Object element; @@ -242,7 +243,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner break; } default: - throw new Exception("type = " + type + " not recognized!"); + throw new PIRException("type = " + type + " not recognized!"); } return element; } @@ -264,7 +265,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Partitions an object to an ArrayList of BigInteger values, currently represents an 8-bit partitioning */ @Override - public ArrayList<BigInteger> toPartitions(Object obj, String type) throws Exception + public ArrayList<BigInteger> toPartitions(Object obj, String type) throws PIRException { ArrayList<BigInteger> parts = new ArrayList<>(); @@ -378,7 +379,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Method to get an empty set of partitions by data type - used for padding return array values */ @Override - public ArrayList<BigInteger> getPaddedPartitions(String type) throws Exception + public ArrayList<BigInteger> getPaddedPartitions(String type) throws PIRException { ArrayList<BigInteger> parts = new ArrayList<>(); @@ -433,7 +434,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Create partitions for an array of the same type of elements - used when a data value field is an array and we wish to encode these into the return value */ @Override - public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws Exception + public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws PIRException { ArrayList<BigInteger> parts = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java index a8445ca..d767f2d 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java @@ -18,13 +18,16 @@ */ package org.apache.pirk.schema.query; -import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +36,7 @@ import org.apache.pirk.schema.data.DataSchemaRegistry; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.query.filter.DataFilter; import org.apache.pirk.schema.query.filter.FilterFactory; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +44,7 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; /** * Class to load any query schemas specified in the properties file, 'query.schemas' @@ -85,79 +90,102 @@ public class QuerySchemaLoader } } + /* Kept for compatibility */ public static void initialize() throws Exception { initialize(false, null); } + /* Kept for compatibility */ public static void initialize(boolean hdfs, FileSystem fs) throws Exception { String querySchemas = SystemConfiguration.getProperty("query.schemas", "none"); - if (!querySchemas.equals("none")) + if (querySchemas.equals("none")) { - String[] querySchemaFiles = querySchemas.split(","); - for (String schemaFile : querySchemaFiles) + return; + } + String[] querySchemaFiles = querySchemas.split(","); + for (String schemaFile : querySchemaFiles) + { + logger.info("Loading schemaFile = " + schemaFile); + + // Parse and load the schema file into a QuerySchema object; place in the schemaMap + QuerySchemaLoader loader = new QuerySchemaLoader(); + InputStream is; + if (hdfs) { - logger.info("Loading schemaFile = " + schemaFile); + is = fs.open(new Path(schemaFile)); + logger.info("hdfs: filePath = " + schemaFile); + } + else + { + is = new FileInputStream(schemaFile); + logger.info("localFS: inputFile = " + schemaFile); + } - // Parse and load the schema file into a QuerySchema object; place in the schemaMap - QuerySchema querySchema = loadQuerySchemaFile(schemaFile, hdfs, fs); + try + { + QuerySchema querySchema = loader.loadSchema(is); QuerySchemaRegistry.put(querySchema); + } finally + { + is.close(); } } } - private static QuerySchema loadQuerySchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception + /** + * Default constructor. + */ + public QuerySchemaLoader() { - QuerySchema querySchema; - DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); - DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + } - // Read in and parse the schema file - Document doc; - if (hdfs) - { - Path filePath = new Path(schemaFile); - doc = dBuilder.parse(fs.open(filePath)); - logger.info("hdfs: filePath = " + filePath.toString()); - } - else - { - File inputFile = new File(schemaFile); - doc = dBuilder.parse(inputFile); - logger.info("localFS: inputFile = " + inputFile.toString()); - } - doc.getDocumentElement().normalize(); - logger.info("Root element: " + doc.getDocumentElement().getNodeName()); + /** + * Returns the query schema as defined in XML format on the given stream. + * + * @param stream + * The source of the XML query schema description. + * @return The query schema. + * @throws IOException + * A problem occurred reading from the given stream. + * @throws PIRException + * The schema description is invalid. + */ + public QuerySchema loadSchema(InputStream stream) throws IOException, PIRException + { + // Read in and parse the XML file. + Document doc = parseXMLDocument(stream); - // Extract the schemaName + // Extract the schemaName. String schemaName = extractValue(doc, "schemaName"); logger.info("schemaName = " + schemaName); - // Extract the dataSchemaName + // Extract the dataSchemaName. String dataSchemaName = extractValue(doc, "dataSchemaName"); logger.info("dataSchemaName = " + dataSchemaName); + // We must have a matching data schema for this query. DataSchema dataSchema = DataSchemaRegistry.get(dataSchemaName); if (dataSchema == null) { - throw new Exception("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName); + throw new PIRException("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName); } - // Extract the selectorName + // Extract the selectorName, and ensure it matches an element in the data schema. String selectorName = extractValue(doc, "selectorName"); logger.info("selectorName = " + selectorName); if (!dataSchema.containsElement(selectorName)) { - throw new Exception("dataSchema = " + dataSchemaName + " does not contain selectorName = " + selectorName); + throw new PIRException("dataSchema = " + dataSchemaName + " does not contain selectorName = " + selectorName); } - // Extract the elements + // Extract the query elements. NodeList elementsList = doc.getElementsByTagName("elements"); - if (elementsList.getLength() > 1) + if (elementsList.getLength() != 1) { - throw new Exception("elementsList.getLength() = " + elementsList.getLength() + " -- should be 1"); + throw new PIRException("elementsList.getLength() = " + elementsList.getLength() + " -- should be 1"); } Element elements = (Element) elementsList.item(0); @@ -169,29 +197,27 @@ public class QuerySchemaLoader Node nNode = nList.item(i); if (nNode.getNodeType() == Node.ELEMENT_NODE) { - Element eElement = (Element) nNode; - - // Pull the name and add to the TreeSet - String name = eElement.getFirstChild().getNodeValue().trim(); - elementNames.add(name); - - // Compute the number of bits for this element - logger.info("name = " + name); - logger.info("partitionerName = " + dataSchema.getPartitionerTypeName(name)); - if ((dataSchema.getPartitionerForElement(name)) == null) + // Pull the name + String queryElementName = ((Element) nNode).getFirstChild().getNodeValue().trim(); + if (!dataSchema.containsElement(queryElementName)) { - logger.info("partitioner is null"); + throw new PIRException("dataSchema = " + dataSchemaName + " does not contain requested element name = " + queryElementName); } - int bits = ((DataPartitioner) dataSchema.getPartitionerForElement(name)).getBits(dataSchema.getElementType(name)); + elementNames.add(queryElementName); + logger.info("name = " + queryElementName + " partitionerName = " + dataSchema.getPartitionerTypeName(queryElementName)); - // Multiply by the number of array elements allowed, if applicable - if (dataSchema.getArrayElements().contains(name)) + // Compute the number of bits for this element. + DataPartitioner partitioner = dataSchema.getPartitionerForElement(queryElementName); + int bits = partitioner.getBits(dataSchema.getElementType(queryElementName)); + + // Multiply by the number of array elements allowed, if applicable. + if (dataSchema.isArrayElement(queryElementName)) { bits *= Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements")); } dataElementSize += bits; - logger.info("name = " + name + " bits = " + bits + " dataElementSize = " + dataElementSize); + logger.info("name = " + queryElementName + " bits = " + bits + " dataElementSize = " + dataElementSize); } } @@ -202,62 +228,88 @@ public class QuerySchemaLoader filterTypeName = doc.getElementsByTagName("filter").item(0).getTextContent().trim(); } - // Extract the filterNames, if they exist - HashSet<String> filterNamesSet = new HashSet<>(); - if (doc.getElementsByTagName("filterNames").item(0) != null) + // Create a filter over the query elements. + Set<String> filteredNamesSet = extractFilteredElementNames(doc); + DataFilter filter = instantiateFilter(filterTypeName, filteredNamesSet); + + // Create and return the query schema object. + QuerySchema querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName, filter, dataElementSize); + querySchema.getElementNames().addAll(elementNames); + querySchema.getFilteredElementNames().addAll(filteredNamesSet); + return querySchema; + } + + /* + * Parses and normalizes the XML document available on the given stream. + */ + private Document parseXMLDocument(InputStream stream) throws IOException, PIRException + { + Document doc; + try + { + DocumentBuilder dBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + doc = dBuilder.parse(stream); + } catch (ParserConfigurationException | SAXException e) + { + throw new PIRException("Schema parsing error", e); + } + doc.getDocumentElement().normalize(); + logger.info("Root element: " + doc.getDocumentElement().getNodeName()); + + return doc; + } + + /* + * Returns the possibly empty set of element names over which the filter is applied, maintaining document order. + */ + private Set<String> extractFilteredElementNames(Document doc) throws PIRException + { + HashSet<String> filteredNamesSet = new HashSet<>(); + + NodeList filterNamesList = doc.getElementsByTagName("filterNames"); + if (filterNamesList.getLength() != 0) { - NodeList filterNamesList = doc.getElementsByTagName("filterNames"); if (filterNamesList.getLength() > 1) { - throw new Exception("filterNamesList.getLength() = " + filterNamesList.getLength() + " -- should be 1"); + throw new PIRException("filterNamesList.getLength() = " + filterNamesList.getLength() + " -- should be 0 or 1"); } - Element filterNames = (Element) filterNamesList.item(0); - NodeList filterNList = filterNames.getElementsByTagName("name"); + // Extract element names from the list. + Element foo = (Element) filterNamesList.item(0); + NodeList filterNList = ((Element) filterNamesList.item(0)).getElementsByTagName("name"); for (int i = 0; i < filterNList.getLength(); i++) { Node nNode = filterNList.item(i); if (nNode.getNodeType() == Node.ELEMENT_NODE) { - Element eElement = (Element) nNode; - - // Pull the name and add to the TreeSet - String name = eElement.getFirstChild().getNodeValue().trim(); - filterNamesSet.add(name); + // Pull the name and add to the set. + String name = ((Element) nNode).getFirstChild().getNodeValue().trim(); + filteredNamesSet.add(name); logger.info("filterName = " + name); } } } - - // Create the query schema object - - DataFilter filter = instantiateFilter(filterTypeName, filterNamesSet); - querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName, filter, dataElementSize); - querySchema.getElementNames().addAll(elementNames); - querySchema.getFilteredElementNames().addAll(filterNamesSet); - return querySchema; + return filteredNamesSet; } - /** - * Extracts a top level, single value from the xml structure + /* + * Extracts a top level, single value from the XML structure. + * + * Throws an exception if there is not exactly one tag with the given name. */ - private static String extractValue(Document doc, String valueName) throws Exception + private String extractValue(Document doc, String tagName) throws PIRException { - NodeList itemList = doc.getElementsByTagName(valueName); - if (itemList.getLength() > 1) + NodeList itemList = doc.getElementsByTagName(tagName); + if (itemList.getLength() != 1) { - throw new Exception("itemList.getLength() = " + itemList.getLength() + " -- should be 1"); + throw new PIRException("itemList.getLength() = " + itemList.getLength() + " -- should be 1"); } return itemList.item(0).getTextContent().trim(); } - private static DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames) throws Exception + private DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames) throws IOException, PIRException { - if (!filterTypeName.equals(NO_FILTER)) - { - return FilterFactory.getFilter(filterTypeName, filteredElementNames); - } - return null; + return filterTypeName.equals(NO_FILTER) ? null : FilterFactory.getFilter(filterTypeName, filteredElementNames); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java index bfef7e3..c44e1e8 100644 --- a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java +++ b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java @@ -21,6 +21,7 @@ package org.apache.pirk.schema.query.filter; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import java.io.IOException; import java.io.InputStreamReader; import java.util.HashSet; import java.util.Set; @@ -28,6 +29,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; /** @@ -35,7 +37,7 @@ import org.apache.pirk.utils.SystemConfiguration; */ public class FilterFactory { - public static DataFilter getFilter(String filterName, Set<String> filteredElementNames) throws Exception + public static DataFilter getFilter(String filterName, Set<String> filteredElementNames) throws IOException, PIRException { Object obj = null; @@ -72,11 +74,14 @@ public class FilterFactory else { // Instantiate and validate the interface implementation - Class c = Class.forName(filterName); - obj = c.newInstance(); - if (!(obj instanceof DataFilter)) + try { - throw new Exception("filterName = " + filterName + " DOES NOT implement the DataFilter interface"); + @SuppressWarnings("unchecked") + Class<? extends DataFilter> c = (Class<? extends DataFilter>) Class.forName(filterName); + obj = c.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | ClassCastException e) + { + throw new PIRException("filterName = " + filterName + " cannot be instantiated or does not implement DataFilter interface"); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/c557fffe/src/main/resources/query-schema.xsd ---------------------------------------------------------------------- diff --git a/src/main/resources/query-schema.xsd b/src/main/resources/query-schema.xsd index a0657e4..65a36ce 100644 --- a/src/main/resources/query-schema.xsd +++ b/src/main/resources/query-schema.xsd @@ -22,24 +22,93 @@ <xs:element name="schema"> <xs:complexType> <xs:sequence> - <xs:element name="schemaName" type="xs:string" /> - <xs:element name="dataSchemaName" type="xs:string" /> - <xs:element name="selectorName" type="xs:string" /> + <xs:element name="schemaName" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of the query schema. + The name omits leading and trailing + whitespace, and is case sensitive. + </xs:documentation> + </xs:annotation> + </xs:element> + + <xs:element name="dataSchemaName" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of the data schema + over which this query is run. The name omits + leading and trailing whitespace, and is case + sensitive. + </xs:documentation> + </xs:annotation> + </xs:element> + + <xs:element name="selectorName" type="xs:string"> + <xs:annotation> + <xs:documentation>The name of the name of the + element in the data schema that will be the + selector for this query. + </xs:documentation> + </xs:annotation> + </xs:element> + <xs:element name="elements"> + <xs:annotation> + <xs:documentation> + The set of element names to + include in the query response. + </xs:documentation> + </xs:annotation> <xs:complexType> <xs:sequence> + <xs:element name="name" type="xs:string" - maxOccurs="unbounded" /> + maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + The name of an + element in the data schema to + include in the query response. + </xs:documentation> + </xs:annotation> + </xs:element> + </xs:sequence> </xs:complexType> </xs:element> + <xs:element name="filter" type="xs:string" - minOccurs="0" /> + minOccurs="0"> + <xs:annotation> + <xs:documentation> + The name of a class used to + filter the query response data. + </xs:documentation> + </xs:annotation> + </xs:element> + <xs:element name="filterNames" minOccurs="0" maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + The set of data element names + over which the + response filter is applied. + </xs:documentation> + </xs:annotation> <xs:complexType> <xs:sequence> - <xs:element name="name" type="xs:string" /> + + <xs:element name="name" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of an + element in the data schema over + which to apply the filter. + </xs:documentation> + </xs:annotation> + </xs:element> + </xs:sequence> </xs:complexType> </xs:element>
