Repository: incubator-pirk Updated Branches: refs/heads/master c12d47f1d -> 89052a78a
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/LoadQuerySchemas.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/LoadQuerySchemas.java b/src/main/java/org/apache/pirk/schema/query/LoadQuerySchemas.java deleted file mode 100644 index 93ae66a..0000000 --- a/src/main/java/org/apache/pirk/schema/query/LoadQuerySchemas.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pirk.schema.query; - -import java.io.File; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; -import java.util.TreeSet; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.pirk.schema.data.DataSchema; -import org.apache.pirk.schema.data.DataSchemaRegistry; -import org.apache.pirk.schema.data.partitioner.DataPartitioner; -import org.apache.pirk.utils.SystemConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; - -/** - * Class to load any query schemas specified in the properties file, 'query.schemas' - * <p> - * Schemas should be specified as follows; all items are treated in a case insensitive manner: - * - * <pre> - * {@code - * <schema> - * <schemaName> name of the schema </schemaName> - * <dataSchemaName> name of the data schema over which this query is run </dataSchemaName> - * <selectorName> name of the element in the data schema that will be the selector </selectorName> - * <elements> - * <name> element name of element in the data schema to include in the query response </name> - * </elements> - * <filter> (optional) name of the filter class to use to filter the data </filter> - * <filterNames> (optional) - * <name> element name of element in the data schema to apply pre-processing filters </name> - * </filterNames> - * </schema> - * } - * </pre> - * <p> - * TODO: Allow the schema to specify how many array elements to return per element, if the element is an array type - */ -public class LoadQuerySchemas -{ - private static final Logger logger = LoggerFactory.getLogger(LoadQuerySchemas.class); - - private static HashMap<String,QuerySchema> schemaMap; - - static - { - logger.info("Loading query schemas: "); - - schemaMap = new HashMap<>(); - try - { - initialize(); - } catch (Exception e) - { - logger.error("Caught exception: "); - e.printStackTrace(); - } - } - - public static void initialize() throws Exception - { - initialize(false, null); - } - - public static void initialize(boolean hdfs, FileSystem fs) throws Exception - { - String querySchemas = SystemConfiguration.getProperty("query.schemas", "none"); - if (!querySchemas.equals("none")) - { - String[] querySchemaFiles = querySchemas.split(","); - for (String schemaFile : querySchemaFiles) - { - logger.info("Loading schemaFile = " + schemaFile); - - // Parse and load the schema file into a QuerySchema object; place in the schemaMap - QuerySchema querySchema = loadQuerySchemaFile(schemaFile, hdfs, fs); - schemaMap.put(querySchema.getSchemaName(), querySchema); - } - } - } - - public static HashMap<String,QuerySchema> getSchemaMap() - { - return schemaMap; - } - - public static Set<String> getSchemaNames() - { - return schemaMap.keySet(); - } - - public static QuerySchema getSchema(String schemaName) - { - return schemaMap.get(schemaName); - } - - private static QuerySchema loadQuerySchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception - { - QuerySchema querySchema; - - DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); - DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); - - // Read in and parse the schema file - Document doc; - if (hdfs) - { - Path filePath = new Path(schemaFile); - doc = dBuilder.parse(fs.open(filePath)); - logger.info("hdfs: filePath = " + filePath.toString()); - } - else - { - File inputFile = new File(schemaFile); - doc = dBuilder.parse(inputFile); - logger.info("localFS: inputFile = " + inputFile.toString()); - } - doc.getDocumentElement().normalize(); - logger.info("Root element: " + doc.getDocumentElement().getNodeName()); - - // Extract the schemaName - String schemaName = extractValue(doc, "schemaName"); - logger.info("schemaName = " + schemaName); - - // Extract the dataSchemaName - String dataSchemaName = extractValue(doc, "dataSchemaName"); - logger.info("dataSchemaName = " + dataSchemaName); - - DataSchema dataSchema = DataSchemaRegistry.get(dataSchemaName); - if (dataSchema == null) - { - throw new Exception("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName); - } - - // Extract the selectorName - String selectorName = extractValue(doc, "selectorName"); - logger.info("selectorName = " + selectorName); - if (!dataSchema.containsElement(selectorName)) - { - throw new Exception("dataSchema = " + dataSchemaName + " does not contain selectorName = " + selectorName); - } - - // Extract the elements - NodeList elementsList = doc.getElementsByTagName("elements"); - if (elementsList.getLength() > 1) - { - throw new Exception("elementsList.getLength() = " + elementsList.getLength() + " -- should be 1"); - } - Element elements = (Element) elementsList.item(0); - - TreeSet<String> elementNames = new TreeSet<>(); - int dataElementSize = 0; - NodeList nList = elements.getElementsByTagName("name"); - for (int i = 0; i < nList.getLength(); i++) - { - Node nNode = nList.item(i); - if (nNode.getNodeType() == Node.ELEMENT_NODE) - { - Element eElement = (Element) nNode; - - // Pull the name and add to the TreeSet - String name = eElement.getFirstChild().getNodeValue().trim(); - elementNames.add(name); - - // Compute the number of bits for this element - logger.info("name = " + name); - logger.info("partitionerName = " + dataSchema.getPartitionerTypeName(name)); - if ((dataSchema.getPartitionerForElement(name)) == null) - { - logger.info("partitioner is null"); - } - int bits = ((DataPartitioner) dataSchema.getPartitionerForElement(name)).getBits(dataSchema.getElementType(name)); - - // Multiply by the number of array elements allowed, if applicable - if (dataSchema.getArrayElements().contains(name)) - { - bits *= Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements")); - } - dataElementSize += bits; - - logger.info("name = " + name + " bits = " + bits + " dataElementSize = " + dataElementSize); - } - } - - // Extract the filter, if it exists - String filter = QuerySchema.NO_FILTER; - if (doc.getElementsByTagName("filter").item(0) != null) - { - filter = doc.getElementsByTagName("filter").item(0).getTextContent().trim(); - } - - // Extract the filterNames, if they exist - HashSet<String> filterNamesSet = new HashSet<>(); - if (doc.getElementsByTagName("filterNames").item(0) != null) - { - NodeList filterNamesList = doc.getElementsByTagName("filterNames"); - if (filterNamesList.getLength() > 1) - { - throw new Exception("filterNamesList.getLength() = " + filterNamesList.getLength() + " -- should be 1"); - } - Element filterNames = (Element) filterNamesList.item(0); - - NodeList filterNList = filterNames.getElementsByTagName("name"); - for (int i = 0; i < filterNList.getLength(); i++) - { - Node nNode = filterNList.item(i); - if (nNode.getNodeType() == Node.ELEMENT_NODE) - { - Element eElement = (Element) nNode; - - // Pull the name and add to the TreeSet - String name = eElement.getFirstChild().getNodeValue().trim(); - filterNamesSet.add(name); - - logger.info("filterName = " + name); - } - } - } - - // Create the query schema object - querySchema = new QuerySchema(schemaName, dataSchemaName, elementNames, selectorName, dataElementSize, filterNamesSet, filter); - - return querySchema; - } - - /** - * Extracts a top level, single value from the xml structure - */ - private static String extractValue(Document doc, String valueName) throws Exception - { - String value; - - NodeList itemList = doc.getElementsByTagName(valueName); - if (itemList.getLength() > 1) - { - throw new Exception("itemList.getLength() = " + itemList.getLength() + " -- should be 1"); - } - value = itemList.item(0).getTextContent().trim(); - - return value; - } - - /** - * Checks whether or not (true/false) the given schema is loaded - */ - public static boolean containsSchema(String schemaNameIn) - { - return schemaMap.containsKey(schemaNameIn); - } - - public static void printSchemas() - { - for (String schema : schemaMap.keySet()) - { - logger.info("schema = " + schema); - } - } - - public static String getSchemasString() - { - String schemasString = ""; - for (String schema : schemaMap.keySet()) - { - schemasString += " \n" + schema; - } - return schemasString; - } -} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/QuerySchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java index 09e4d85..9099376 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java @@ -19,70 +19,97 @@ package org.apache.pirk.schema.query; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashSet; -import java.util.TreeSet; +import java.util.List; +import java.util.Set; -import org.apache.pirk.schema.query.filter.FilterFactory; +import org.apache.pirk.schema.query.filter.DataFilter; /** * Class to hold a query schema - * <p> - * TODO: - * <p> - * -Could easily add the ability for multiple filters (filter list capability) instead of just one + * */ public class QuerySchema implements Serializable { private static final long serialVersionUID = 1L; - static final String NO_FILTER = "noFilter"; - - private String schemaName = null; + // This schema's name. + private final String schemaName; - private String dataSchemaName = null; // name of the DataSchema for this query schema + // Name of the data schema associated with this query schema. + private final String dataSchemaName; - private TreeSet<String> elementNames = null; // names of elements in the data schema to - // include in the response, order matters for packing/unpacking + // Name of element in the dataSchema to be used as the selector. + private final String selectorName; - private String filter = null; // name of filter class to use in data filtering + // Element names from the data schema to include in the response. + // Order matters for packing/unpacking. + private final List<String> elementNames = new ArrayList<String>(); - private Object filterObj = null; // instance of the filter + // Name of class to use in data filtering. + private final String filterTypeName; - private HashSet<String> filterElementNames = null; // set of element names to apply filtering in pre-processing + // Instance of the filterTypeName. + private final DataFilter filter; - private String selectorName = null; // name of element in the dataSchema to be used as the selector + // Set of data schema element names on which to apply filtering. + private final Set<String> filteredElementNames = new HashSet<>(); - private int dataElementSize = 0; // total number of bits to be returned for each data element hit + // Total number of bits to be returned for each data element hit. + private final int dataElementSize; - public QuerySchema(String schemaNameInput, String dataSchemaNameInput, TreeSet<String> elementNamesInput, String selectorNameInput, int dataElementSizeInput, - HashSet<String> filterElementNamesInput, String filterIn) throws Exception + QuerySchema(String schemaName, String dataSchemaName, String selectorName, String filterTypeName, DataFilter filter, int dataElementSize) { - schemaName = schemaNameInput; - dataSchemaName = dataSchemaNameInput; - elementNames = elementNamesInput; - selectorName = selectorNameInput; - dataElementSize = dataElementSizeInput; - filterElementNames = filterElementNamesInput; - filter = filterIn; - - instantiateFilter(); + this.schemaName = schemaName; + this.dataSchemaName = dataSchemaName; + this.selectorName = selectorName; + this.filterTypeName = filterTypeName; + this.filter = filter; + this.dataElementSize = dataElementSize; } + /** + * Returns the name of this schema. + * + * @return The schema name. + */ public String getSchemaName() { return schemaName; } + /** + * Returns the name of the data schema. + * <p> + * This query is designed to be run over data described by this data schema. + * + * @return The data schema name. + */ public String getDataSchemaName() { return dataSchemaName; } - public TreeSet<String> getElementNames() + /** + * Returns the element names to include in the response. + * <p> + * The element names are defined by the data schema associated with this query. + * + * @return The ordered list of query element names. + */ + public List<String> getElementNames() { return elementNames; } + /** + * Returns the element name used as the selector. + * <p> + * The element names are defined by the data schema associated with this query. + * + * @return The element names being selected. + */ public String getSelectorName() { return selectorName; @@ -94,38 +121,36 @@ public class QuerySchema implements Serializable } /** - * Method to get the name of the filter class for this query + * Returns the name of the filter class for this query. + * + * The filter class name is the fully qualified name of a Java class that implements the {@link DataFilter} interface. + * + * @return The type name of the query filter, or <code>null</code> if there is no filter defined. */ - public String getFilter() + public String getFilterTypeName() { - return filter; + return filterTypeName; } - public HashSet<String> getFilterElementNames() + /** + * Returns the set of element names on which to apply the filter. + * + * @return The possibly empty set of data schema element names. + */ + public Set<String> getFilteredElementNames() { - return filterElementNames; + return filteredElementNames; } /** - * Method to return the instance of the specified filter for this query + * Returns the data element filter for this query. * <p> - * Will return null if no filter has been specified for the query + * The data filter is applied to the {@link QuerySchema#getFilteredElementNames()} data elements. + * + * @return The data filter, or <code>null</code> if no filter has been specified for this query. */ - public Object getFilterInstance() throws Exception + public DataFilter getFilter() { - instantiateFilter(); - - return filterObj; - } - - private void instantiateFilter() throws Exception - { - if (!filter.equals(NO_FILTER)) - { - if (filterObj == null) - { - filterObj = FilterFactory.getFilter(filter, this); - } - } + return filter; } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java new file mode 100644 index 0000000..a8445ca --- /dev/null +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.schema.query; + +import java.io.File; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pirk.schema.data.DataSchema; +import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.data.partitioner.DataPartitioner; +import org.apache.pirk.schema.query.filter.DataFilter; +import org.apache.pirk.schema.query.filter.FilterFactory; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +/** + * Class to load any query schemas specified in the properties file, 'query.schemas' + * <p> + * Schemas should be specified as follows; all items are treated in a case insensitive manner: + * + * <pre> + * {@code + * <schema> + * <schemaName> name of the schema </schemaName> + * <dataSchemaName> name of the data schema over which this query is run </dataSchemaName> + * <selectorName> name of the element in the data schema that will be the selector </selectorName> + * <elements> + * <name> element name of element in the data schema to include in the query response </name> + * </elements> + * <filter> (optional) name of the filter class to use to filter the data </filter> + * <filterNames> (optional) + * <name> element name of element in the data schema to apply pre-processing filters </name> + * </filterNames> + * </schema> + * } + * </pre> + * <p> + * TODO: Allow the schema to specify how many array elements to return per element, if the element is an array type + */ +public class QuerySchemaLoader +{ + private static final Logger logger = LoggerFactory.getLogger(QuerySchemaLoader.class); + + private static final String NO_FILTER = "noFilter"; + + static + { + logger.info("Loading query schemas: "); + + try + { + initialize(); + } catch (Exception e) + { + logger.error("Caught exception: "); + e.printStackTrace(); + } + } + + public static void initialize() throws Exception + { + initialize(false, null); + } + + public static void initialize(boolean hdfs, FileSystem fs) throws Exception + { + String querySchemas = SystemConfiguration.getProperty("query.schemas", "none"); + if (!querySchemas.equals("none")) + { + String[] querySchemaFiles = querySchemas.split(","); + for (String schemaFile : querySchemaFiles) + { + logger.info("Loading schemaFile = " + schemaFile); + + // Parse and load the schema file into a QuerySchema object; place in the schemaMap + QuerySchema querySchema = loadQuerySchemaFile(schemaFile, hdfs, fs); + QuerySchemaRegistry.put(querySchema); + } + } + } + + private static QuerySchema loadQuerySchemaFile(String schemaFile, boolean hdfs, FileSystem fs) throws Exception + { + QuerySchema querySchema; + + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); + + // Read in and parse the schema file + Document doc; + if (hdfs) + { + Path filePath = new Path(schemaFile); + doc = dBuilder.parse(fs.open(filePath)); + logger.info("hdfs: filePath = " + filePath.toString()); + } + else + { + File inputFile = new File(schemaFile); + doc = dBuilder.parse(inputFile); + logger.info("localFS: inputFile = " + inputFile.toString()); + } + doc.getDocumentElement().normalize(); + logger.info("Root element: " + doc.getDocumentElement().getNodeName()); + + // Extract the schemaName + String schemaName = extractValue(doc, "schemaName"); + logger.info("schemaName = " + schemaName); + + // Extract the dataSchemaName + String dataSchemaName = extractValue(doc, "dataSchemaName"); + logger.info("dataSchemaName = " + dataSchemaName); + + DataSchema dataSchema = DataSchemaRegistry.get(dataSchemaName); + if (dataSchema == null) + { + throw new Exception("Loaded DataSchema does not exist for dataSchemaName = " + dataSchemaName); + } + + // Extract the selectorName + String selectorName = extractValue(doc, "selectorName"); + logger.info("selectorName = " + selectorName); + if (!dataSchema.containsElement(selectorName)) + { + throw new Exception("dataSchema = " + dataSchemaName + " does not contain selectorName = " + selectorName); + } + + // Extract the elements + NodeList elementsList = doc.getElementsByTagName("elements"); + if (elementsList.getLength() > 1) + { + throw new Exception("elementsList.getLength() = " + elementsList.getLength() + " -- should be 1"); + } + Element elements = (Element) elementsList.item(0); + + LinkedHashSet<String> elementNames = new LinkedHashSet<>(); + int dataElementSize = 0; + NodeList nList = elements.getElementsByTagName("name"); + for (int i = 0; i < nList.getLength(); i++) + { + Node nNode = nList.item(i); + if (nNode.getNodeType() == Node.ELEMENT_NODE) + { + Element eElement = (Element) nNode; + + // Pull the name and add to the TreeSet + String name = eElement.getFirstChild().getNodeValue().trim(); + elementNames.add(name); + + // Compute the number of bits for this element + logger.info("name = " + name); + logger.info("partitionerName = " + dataSchema.getPartitionerTypeName(name)); + if ((dataSchema.getPartitionerForElement(name)) == null) + { + logger.info("partitioner is null"); + } + int bits = ((DataPartitioner) dataSchema.getPartitionerForElement(name)).getBits(dataSchema.getElementType(name)); + + // Multiply by the number of array elements allowed, if applicable + if (dataSchema.getArrayElements().contains(name)) + { + bits *= Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements")); + } + dataElementSize += bits; + + logger.info("name = " + name + " bits = " + bits + " dataElementSize = " + dataElementSize); + } + } + + // Extract the filter, if it exists + String filterTypeName = NO_FILTER; + if (doc.getElementsByTagName("filter").item(0) != null) + { + filterTypeName = doc.getElementsByTagName("filter").item(0).getTextContent().trim(); + } + + // Extract the filterNames, if they exist + HashSet<String> filterNamesSet = new HashSet<>(); + if (doc.getElementsByTagName("filterNames").item(0) != null) + { + NodeList filterNamesList = doc.getElementsByTagName("filterNames"); + if (filterNamesList.getLength() > 1) + { + throw new Exception("filterNamesList.getLength() = " + filterNamesList.getLength() + " -- should be 1"); + } + Element filterNames = (Element) filterNamesList.item(0); + + NodeList filterNList = filterNames.getElementsByTagName("name"); + for (int i = 0; i < filterNList.getLength(); i++) + { + Node nNode = filterNList.item(i); + if (nNode.getNodeType() == Node.ELEMENT_NODE) + { + Element eElement = (Element) nNode; + + // Pull the name and add to the TreeSet + String name = eElement.getFirstChild().getNodeValue().trim(); + filterNamesSet.add(name); + + logger.info("filterName = " + name); + } + } + } + + // Create the query schema object + + DataFilter filter = instantiateFilter(filterTypeName, filterNamesSet); + querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName, filter, dataElementSize); + querySchema.getElementNames().addAll(elementNames); + querySchema.getFilteredElementNames().addAll(filterNamesSet); + return querySchema; + } + + /** + * Extracts a top level, single value from the xml structure + */ + private static String extractValue(Document doc, String valueName) throws Exception + { + NodeList itemList = doc.getElementsByTagName(valueName); + if (itemList.getLength() > 1) + { + throw new Exception("itemList.getLength() = " + itemList.getLength() + " -- should be 1"); + } + return itemList.item(0).getTextContent().trim(); + } + + private static DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames) throws Exception + { + if (!filterTypeName.equals(NO_FILTER)) + { + return FilterFactory.getFilter(filterTypeName, filteredElementNames); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/QuerySchemaRegistry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaRegistry.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaRegistry.java new file mode 100644 index 0000000..ce42d79 --- /dev/null +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaRegistry.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.schema.query; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class QuerySchemaRegistry +{ + // The registry. Maps schema name to query schema. + private static final Map<String,QuerySchema> registry = new HashMap<>(); + + // Not designed to be instantiated. + QuerySchemaRegistry() + { + + } + + /** + * Adds the given query schema to the registry. + * + * If there was an existing schema with the same name, it is replaced. + * + * @param schema + * The query schema to add. + * @return the previous schema registered at the same name, or <code>null</code> if there were none. + */ + public static QuerySchema put(QuerySchema schema) + { + return registry.put(schema.getSchemaName(), schema); + } + + /** + * Returns the query schema with the given name. + * + * @param schemaName + * The query schema name to be returned. + * @return The query schema, or <code>null</code> if no such schema. + */ + public static QuerySchema get(String schemaName) + { + return registry.get(schemaName); + } + + /** + * Returns the set of query schema names held in the registry. + * + * @return The possibly empty set of query schema names. + */ + public static Set<String> getNames() + { + return registry.keySet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java index 946f803..bfef7e3 100644 --- a/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java +++ b/src/main/java/org/apache/pirk/schema/query/filter/FilterFactory.java @@ -23,11 +23,11 @@ import java.io.File; import java.io.FileReader; import java.io.InputStreamReader; import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.utils.SystemConfiguration; /** @@ -35,7 +35,7 @@ import org.apache.pirk.utils.SystemConfiguration; */ public class FilterFactory { - public static Object getFilter(String filterName, QuerySchema qSchema) throws Exception + public static DataFilter getFilter(String filterName, Set<String> filteredElementNames) throws Exception { Object obj = null; @@ -66,7 +66,7 @@ public class FilterFactory stopList.add(qLine); } - obj = new StopListFilter(qSchema.getFilterElementNames(), stopList); + obj = new StopListFilter(filteredElementNames, stopList); } } else @@ -80,6 +80,6 @@ public class FilterFactory } } - return obj; + return (DataFilter) obj; } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java b/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java index 018037d..0dcf590 100644 --- a/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java +++ b/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java @@ -21,6 +21,7 @@ package org.apache.pirk.schema.query.filter; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.MapWritable; @@ -39,10 +40,10 @@ public class StopListFilter implements DataFilter private static final Logger logger = LoggerFactory.getLogger(StopListFilter.class); - private HashSet<String> filterSet = null; - private HashSet<String> stopList = null; + private Set<String> filterSet = null; + private Set<String> stopList = null; - public StopListFilter(HashSet<String> filterSetIn, HashSet<String> stopListIn) + public StopListFilter(Set<String> filterSetIn, Set<String> stopListIn) { filterSet = filterSetIn; stopList = stopListIn; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java b/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java index d8928da..e9c1161 100644 --- a/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java +++ b/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java @@ -27,8 +27,8 @@ import org.apache.hadoop.io.Text; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.DataSchemaRegistry; -import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.slf4j.Logger; @@ -75,7 +75,7 @@ public class QueryResponseJSON implements Serializable logger.info("queryInfo is null"); } - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + QuerySchema qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType()); dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName()); jsonObj = new JSONObject(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java index c946e3b..496840c 100755 --- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.schema.data.DataSchemaLoader; -import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.schema.query.filter.StopListFilter; import org.apache.pirk.test.distributed.testsuite.DistTestSuite; import org.apache.pirk.test.utils.Inputs; @@ -142,7 +142,7 @@ public class DistributedTestDriver if (!querySchemasProp.equals("none")) { - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index a01df92..04e16d3 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -27,8 +27,8 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.query.wideskies.QueryUtils; -import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.test.distributed.testsuite.DistTestSuite; import org.apache.pirk.utils.StringUtils; @@ -75,7 +75,7 @@ public class BaseTests { logger.info("Running testDNSHostnameQuery(): "); - QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_HOSTNAME_QUERY); + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY); int numExpectedResults = 6; ArrayList<QueryResponseJSON> results; @@ -200,7 +200,7 @@ public class BaseTests { logger.info("Running testDNSIPQuery(): "); - QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_IP_QUERY); + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_IP_QUERY); ArrayList<QueryResponseJSON> results; if (isDistributed) @@ -275,7 +275,7 @@ public class BaseTests { logger.info("Running testDNSNXDOMAINQuery(): "); - QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_NXDOMAIN_QUERY); + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_NXDOMAIN_QUERY); ArrayList<QueryResponseJSON> results; if (isDistributed) @@ -339,7 +339,7 @@ public class BaseTests { logger.info("Running testSRCIPQuery(): "); - QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_SRCIP_QUERY); + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY); ArrayList<QueryResponseJSON> results; int removeTailElements = 0; @@ -413,7 +413,7 @@ public class BaseTests { logger.info("Running testSRCIPQueryNoFilter(): "); - QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_SRCIP_QUERY_NO_FILTER); + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY_NO_FILTER); ArrayList<QueryResponseJSON> results; int numExpectedResults = 3; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index 826815f..be4d848 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -39,7 +39,7 @@ import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; import org.apache.pirk.schema.data.partitioner.ISO8601DatePartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.test.distributed.DistributedTestDriver; import org.apache.pirk.utils.HDFS; import org.apache.pirk.utils.SystemConfiguration; @@ -497,7 +497,7 @@ public class Inputs null, null, false, fs, hdfs); } - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java index d88daa9..089bec8 100644 --- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java +++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java @@ -35,8 +35,8 @@ import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.responder.wideskies.standalone.Responder; import org.apache.pirk.response.wideskies.Response; -import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.PIRException; @@ -62,7 +62,7 @@ public class StandaloneQuery logger.info("Performing watchlisting: "); ArrayList<QueryResponseJSON> results = null; - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryType); + QuerySchema qSchema = QuerySchemaRegistry.get(queryType); // Create the necessary files LocalFileSystemStore storage = new LocalFileSystemStore(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/utils/FileConst.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/FileConst.java b/src/main/java/org/apache/pirk/utils/FileConst.java index 382b97d..a8ec42b 100644 --- a/src/main/java/org/apache/pirk/utils/FileConst.java +++ b/src/main/java/org/apache/pirk/utils/FileConst.java @@ -24,12 +24,12 @@ package org.apache.pirk.utils; public class FileConst { // For general output - public static String COUNTS = "counts"; - public static String DETAILS = "details"; + public static final String COUNTS = "counts"; + public static final String DETAILS = "details"; // For PIR - public static String PIR = "pir"; - public static String EXP = "exp"; - public static String PIR_COLS = "pirCols"; - public static String PIR_FINAL = "pirFinal"; + public static final String PIR = "pir"; + public static final String EXP = "exp"; + public static final String PIR_COLS = "pirCols"; + public static final String PIR_FINAL = "pirFinal"; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/utils/StopListUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/StopListUtils.java b/src/main/java/org/apache/pirk/utils/StopListUtils.java index 285179e..af21a24 100644 --- a/src/main/java/org/apache/pirk/utils/StopListUtils.java +++ b/src/main/java/org/apache/pirk/utils/StopListUtils.java @@ -18,7 +18,7 @@ */ package org.apache.pirk.utils; -import java.util.HashSet; +import java.util.Set; /** * Utilities for stop listing data items/elements @@ -28,7 +28,7 @@ public class StopListUtils /** * Checks to see whether an element (or subdomain of the given element) is contained in the HashSet If it is not in the set, returns true (keep) */ - public static boolean checkElement(String element, HashSet<String> filterSet) + public static boolean checkElement(String element, Set<String> filterSet) { boolean notInSet = true; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/main/java/org/apache/pirk/utils/SystemConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java index c10ecfc..7bea235 100755 --- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java +++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java @@ -25,7 +25,7 @@ import java.io.InputStream; import java.util.Properties; import org.apache.pirk.schema.data.DataSchemaLoader; -import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchemaLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +71,7 @@ public class SystemConfiguration // Load any query schema files indicated in the properties try { - LoadQuerySchemas.class.newInstance(); + QuerySchemaLoader.class.newInstance(); } catch (Exception e) { logger.error("Issue when invoking DataSchemaLoader"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/test/java/test/general/QueryParserUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/general/QueryParserUtilsTest.java b/src/test/java/test/general/QueryParserUtilsTest.java index 16d73bb..394f335 100644 --- a/src/test/java/test/general/QueryParserUtilsTest.java +++ b/src/test/java/test/general/QueryParserUtilsTest.java @@ -18,15 +18,13 @@ */ package test.general; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.Map; import org.apache.hadoop.io.MapWritable; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.test.utils.Inputs; import org.apache.pirk.utils.QueryParserUtils; import org.apache.pirk.utils.StringUtils; @@ -35,6 +33,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Class for testing the QueryParser methods */ http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/test/java/test/schema/data/LoadDataSchemaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/schema/data/LoadDataSchemaTest.java b/src/test/java/test/schema/data/LoadDataSchemaTest.java index 07167fc..ab94ceb 100644 --- a/src/test/java/test/schema/data/LoadDataSchemaTest.java +++ b/src/test/java/test/schema/data/LoadDataSchemaTest.java @@ -18,10 +18,6 @@ */ package test.schema.data; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.File; import java.io.IOException; @@ -33,8 +29,8 @@ import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; import org.apache.pirk.schema.data.DataSchema; -import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.test.utils.TestUtils; @@ -44,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test suite for LoadDataSchema and DataSchema http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/89052a78/src/test/java/test/schema/query/LoadQuerySchemaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/schema/query/LoadQuerySchemaTest.java b/src/test/java/test/schema/query/LoadQuerySchemaTest.java index fb68c2a..310580d 100644 --- a/src/test/java/test/schema/query/LoadQuerySchemaTest.java +++ b/src/test/java/test/schema/query/LoadQuerySchemaTest.java @@ -37,8 +37,9 @@ import javax.xml.transform.stream.StreamResult; import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.schema.query.filter.StopListFilter; import org.apache.pirk.test.utils.Inputs; import org.apache.pirk.test.utils.TestUtils; @@ -104,17 +105,17 @@ public class LoadQuerySchemaTest e.printStackTrace(); fail(e.toString()); } - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); // Check the entries - QuerySchema qSchema = LoadQuerySchemas.getSchema(querySchemaName); + QuerySchema qSchema = QuerySchemaRegistry.get(querySchemaName); assertEquals(querySchemaName, qSchema.getSchemaName()); assertEquals(dataSchemaName, qSchema.getDataSchemaName()); assertEquals(element4, qSchema.getSelectorName()); - assertEquals(StopListFilter.class.getName(), qSchema.getFilter()); - if (!(qSchema.getFilterInstance() instanceof StopListFilter)) + assertEquals(StopListFilter.class.getName(), qSchema.getFilterTypeName()); + if (!(qSchema.getFilter() instanceof StopListFilter)) { fail("Filter class instance must be StopListFilter"); } @@ -127,8 +128,8 @@ public class LoadQuerySchemaTest fail("elementNames: item = " + item + " must equal one of: " + element1 + ", " + element2 + ", or " + element3); } } - assertEquals(1, qSchema.getFilterElementNames().size()); - for (String item : qSchema.getFilterElementNames()) + assertEquals(1, qSchema.getFilteredElementNames().size()); + for (String item : qSchema.getFilteredElementNames()) { if (!item.equals(element2)) { @@ -155,7 +156,7 @@ public class LoadQuerySchemaTest if (!querySchemasProp.equals("none")) { - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } logger.info("Finished testGeneralSchemaLoad: "); @@ -191,8 +192,8 @@ public class LoadQuerySchemaTest } try { - LoadQuerySchemas.initialize(); - fail("LoadQuerySchemas did not throw exception for bogus filter class"); + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for bogus filter class"); } catch (Exception ignore) {} @@ -208,7 +209,7 @@ public class LoadQuerySchemaTest if (!querySchemasProp.equals("none")) { - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } logger.info("Finished testFunkyFilterScenarios"); @@ -234,8 +235,8 @@ public class LoadQuerySchemaTest } try { - LoadQuerySchemas.initialize(); - fail("LoadQuerySchemas did not throw exception for non-existent DataSchema"); + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for non-existent DataSchema"); } catch (Exception ignore) {} @@ -243,7 +244,7 @@ public class LoadQuerySchemaTest SystemConfiguration.setProperty("query.schemas", querySchemasProp); if (!querySchemasProp.equals("none")) { - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } logger.info("Finished testDataSchemaDoesNotExist "); @@ -282,8 +283,8 @@ public class LoadQuerySchemaTest } try { - LoadQuerySchemas.initialize(); - fail("LoadQuerySchemas did not throw exception for non-existent selectorName"); + QuerySchemaLoader.initialize(); + fail("QuerySchemaLoader did not throw exception for non-existent selectorName"); } catch (Exception ignore) {} @@ -299,7 +300,7 @@ public class LoadQuerySchemaTest if (!querySchemasProp.equals("none")) { - LoadQuerySchemas.initialize(); + QuerySchemaLoader.initialize(); } logger.info("Finished testSelectorDoesNotExistInDataSchema ");
