http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java index 024a89d..a8f26cb 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierCLI.java @@ -163,20 +163,22 @@ public class QuerierCLI options.addOption(optionACTION); // INPUTFILE - Option optionINPUTFILE = new Option("i", QuerierProps.INPUTFILE, true, "required - Fully qualified file containing input " - + "-- \n The input is either: \n (1) For Encryption: A query file - Contains the query selectors, one per line; " - + "the first line must be the query number \n OR \n (2) For Decryption: A response file - Contains the serialized Response object"); + Option optionINPUTFILE = new Option("i", QuerierProps.INPUTFILE, true, + "required - Fully qualified file containing input " + + "-- \n The input is either: \n (1) For Encryption: A query file - Contains the query selectors, one per line; " + + "the first line must be the query number \n OR \n (2) For Decryption: A response file - Contains the serialized Response object"); optionINPUTFILE.setRequired(false); optionINPUTFILE.setArgName(QuerierProps.INPUTFILE); optionINPUTFILE.setType(String.class); options.addOption(optionINPUTFILE); // OUTPUTFILE - Option optionOUTPUTFILE = new Option("o", QuerierProps.OUTPUTFILE, true, "required - Fully qualified file for the result output. " - + "\n The output file specifies either: \n (1) For encryption: \n \t (a) A file to contain the serialized Querier object named: " + "<outputFile>-" - + QuerierConst.QUERIER_FILETAG + " AND \n \t " + "(b) A file to contain the serialized Query object named: <outputFile>-" + QuerierConst.QUERY_FILETAG - + "\n " + "OR \n (2) A file to contain the decryption results where each line is where each line " - + "corresponds to one hit and is a JSON object with the schema QuerySchema"); + Option optionOUTPUTFILE = new Option("o", QuerierProps.OUTPUTFILE, true, + "required - Fully qualified file for the result output. " + + "\n The output file specifies either: \n (1) For encryption: \n \t (a) A file to contain the serialized Querier object named: " + "<outputFile>-" + + QuerierConst.QUERIER_FILETAG + " AND \n \t " + "(b) A file to contain the serialized Query object named: <outputFile>-" + + QuerierConst.QUERY_FILETAG + "\n " + "OR \n (2) A file to contain the decryption results where each line is where each line " + + "corresponds to one hit and is a JSON object with the schema QuerySchema"); optionOUTPUTFILE.setRequired(false); optionOUTPUTFILE.setArgName(QuerierProps.OUTPUTFILE); optionOUTPUTFILE.setType(String.class); @@ -204,8 +206,8 @@ public class QuerierCLI options.addOption(optionQuerySchemas); // TYPE - Option optionTYPE = new Option("qt", QuerierProps.QUERYTYPE, true, "required for encryption -- Type of the query as defined " - + "in the 'schemaName' tag of the corresponding query schema file"); + Option optionTYPE = new Option("qt", QuerierProps.QUERYTYPE, true, + "required for encryption -- Type of the query as defined " + "in the 'schemaName' tag of the corresponding query schema file"); optionTYPE.setRequired(false); optionTYPE.setArgName(QuerierProps.QUERYTYPE); optionTYPE.setType(String.class);
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java index c1a4e4a..0338281 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java @@ -153,9 +153,9 @@ public class QuerierDriver implements Serializable // Perform the action if (action.equals("encrypt")) { - logger.info("Performing encryption: \n inputFile = " + inputFile + "\n outputFile = " + outputFile + "\n numThreads = " + numThreads - + "\n hashBitSize = " + hashBitSize + "\n hashKey = " + hashKey + "\n dataPartitionBitSize = " + dataPartitionBitSize + "\n paillierBitSize = " - + paillierBitSize + "\n certainty = " + certainty); + logger.info("Performing encryption: \n inputFile = " + inputFile + "\n outputFile = " + outputFile + "\n numThreads = " + numThreads + "\n hashBitSize = " + + hashBitSize + "\n hashKey = " + hashKey + "\n dataPartitionBitSize = " + dataPartitionBitSize + "\n paillierBitSize = " + paillierBitSize + + "\n certainty = " + certainty); // Read in the selectors and extract the queryIdentifier - first line in the file ArrayList<String> selectors = FileIOUtils.readToArrayList(inputFile); @@ -182,8 +182,8 @@ public class QuerierDriver implements Serializable BigInteger val = (BigInteger.valueOf(2)).pow(exp); if (val.compareTo(paillier.getN()) != -1) { - logger.error("The number of selectors = " + numSelectors + " must be such that " + "2^{numSelector*dataPartitionBitSize} < N = " - + paillier.getN().toString(2)); + logger.error( + "The number of selectors = " + numSelectors + " must be such that " + "2^{numSelector*dataPartitionBitSize} < N = " + paillier.getN().toString(2)); System.exit(0); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java index 1482137..06bfa28 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java +++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java @@ -72,8 +72,8 @@ public class QueryInfo implements Serializable, Cloneable embedSelectorInput, useHDFSExpLookupTableInput); } - public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInput, String hashKeyInput, int dataPartitionBitSizeInput, - String queryTypeInput, boolean useExpLookupTableInput, boolean embedSelectorInput, boolean useHDFSExpLookupTableInput) + public QueryInfo(UUID identifierInput, int numSelectorsInput, int hashBitSizeInput, String hashKeyInput, int dataPartitionBitSizeInput, String queryTypeInput, + boolean useExpLookupTableInput, boolean embedSelectorInput, boolean useHDFSExpLookupTableInput) { identifier = identifierInput; queryType = queryTypeInput; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index 514491a..e08f2da 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -111,7 +111,7 @@ public class ResponderCLI * Method to parse and validate the options provided * * @return - true if valid, false otherwise - * @throws IOException + * @throws IOException */ private boolean parseOptions() throws IOException { @@ -122,15 +122,15 @@ public class ResponderCLI { SystemConfiguration.loadPropsFromFile(new File(getOptionValue(LOCALPROPFILE))); } - else if(hasOption(HDFSPROPDIR)) + else if (hasOption(HDFSPROPDIR)) { - FileSystem fs = FileSystem.get(new Configuration()); - SystemConfiguration.loadPropsFromHDFSDir(getOptionValue(HDFSPROPDIR), fs); + FileSystem fs = FileSystem.get(new Configuration()); + SystemConfiguration.loadPropsFromHDFSDir(getOptionValue(HDFSPROPDIR), fs); } - else if(hasOption(HDFSPROPFILE)) + else if (hasOption(HDFSPROPFILE)) { - FileSystem fs = FileSystem.get(new Configuration()); - SystemConfiguration.loadPropsFromFile(getOptionValue(HDFSPROPFILE), fs); + FileSystem fs = FileSystem.get(new Configuration()); + SystemConfiguration.loadPropsFromFile(getOptionValue(HDFSPROPFILE), fs); } else { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index 02dbf2e..0e1561c 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -57,12 +57,10 @@ public class ResponderDriver { responder.run(); } - } - catch (PIRException pirEx) + } catch (PIRException pirEx) { logger.error("Failed to load platform plugin: {}! {}", platformName, pirEx.getMessage()); - } - catch (Exception ex) + } catch (Exception ex) { logger.error("Failed to run platform plugin: {}! {}", platformName, ex); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java index a9eb80d..64d0e6a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -106,15 +106,16 @@ public class ResponderProps private static final String STORMSALTCOLUMNS = "storm.saltColumns"; private static final String STORMNUMROWDIVS = "storm.rowDivs"; - private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS, - STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP, - STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, + private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, + STORMWORKERS, STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, + STORMMAXWORKERHEAP, STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, STORMSPLITPARTITIONS, STORMSALTCOLUMNS, STORMNUMROWDIVS}; - static final List<String> PROPSLIST = Arrays.asList((String[]) ArrayUtils.addAll(new String[] {PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, - ESRESOURCE, ESQUERY, ESNODES, OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, - REDUCEMEMORY, MAPJAVAOPTS, REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, - USEMODEXPJOIN, COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS)); + static final List<String> PROPSLIST = Arrays + .asList((String[]) ArrayUtils.addAll(new String[] {PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, OUTPUTFILE, + BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, + REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN, + COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS)); /** * Validates the responder properties @@ -169,7 +170,11 @@ public class ResponderProps if (!SystemConfiguration.hasProperty(BASEQUERY)) { - SystemConfiguration.setProperty("BASEQUERY", "?q=*"); + SystemConfiguration.setProperty(BASEQUERY, "?q=*"); + } + else if (!SystemConfiguration.getProperty(BASEQUERY).startsWith("?q")) + { + SystemConfiguration.setProperty(BASEQUERY, "?q=*"); } } else if (dataInputFormat.equals(InputFormatConst.ES)) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java index 129b1c9..677c1cb 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java @@ -53,7 +53,7 @@ public class ResponderService { try { - for(ResponderPlugin plugin : loader) + for (ResponderPlugin plugin : loader) { if (platformName.equalsIgnoreCase(plugin.getPlatformName())) { @@ -61,8 +61,7 @@ public class ResponderService return plugin; } } - } - catch (ServiceConfigurationError e) + } catch (ServiceConfigurationError e) { logger.error("ResponderPlugin configuration error {}", e); throw new PIRException(e); @@ -70,4 +69,4 @@ public class ResponderService return null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java index fcbc88b..c5844f0 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java @@ -32,7 +32,8 @@ public class MapReduceResponder implements ResponderPlugin private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class); @Override - public String getPlatformName() { + public String getPlatformName() + { return "mapreduce"; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 00bc5c1..add79a4 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -140,8 +140,8 @@ public class ComputeResponse String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); useModExpJoin = SystemConfiguration.getBooleanProperty("pir.useModExpJoin", false); - logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery - + " esResource = " + esResource); + logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + " esResource = " + + esResource); // Set the necessary configurations SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java index 20f02ad..f5a591e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java @@ -93,8 +93,8 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<L } // Compute the encrypted row elements for a query from extracted data partitions - List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector, - maxHitsPerSelector, useLocalCache); + List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector, maxHitsPerSelector, + useLocalCache); returnPairs.addAll(encRowValues); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java index 038287b..8147ff6 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java @@ -34,8 +34,8 @@ import scala.Tuple2; /** * Functionality for computing the encrypted rows using a pre-computed, passed in modular exponentiation lookup table */ -public class EncRowCalcPrecomputedCache implements - PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>>,Long,BigInteger> +public class EncRowCalcPrecomputedCache + implements PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>>,Long,BigInteger> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java index fce905d..c2ed6ec 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java @@ -35,7 +35,8 @@ public class SparkResponder implements ResponderPlugin private static final Logger logger = LoggerFactory.getLogger(SparkResponder.class); @Override - public String getPlatformName() { + public String getPlatformName() + { return "spark"; } @@ -47,8 +48,7 @@ public class SparkResponder implements ResponderPlugin { ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration())); computeResponse.performQuery(); - } - catch (IOException e) + } catch (IOException e) { logger.error("Unable to open filesystem: {}", e); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java index c291df0..492bfea 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java @@ -148,8 +148,8 @@ public class ComputeStreamingResponse queryInput = SystemConfiguration.getProperty("pir.queryInput"); String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); - logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery - + " esResource = " + esResource); + logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + " esResource = " + + esResource); // Pull the batchSeconds and windowLength parameters long batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30); @@ -280,7 +280,8 @@ public class ComputeStreamingResponse { inputRDD = readDataES(); } - else { + else + { throw new PIRException("Unknown data input format " + dataInputFormat); } @@ -458,7 +459,7 @@ public class ComputeStreamingResponse bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue()); // Form and write the response object - encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long, BigInteger>>) rdd -> { + encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long,BigInteger>>) rdd -> { rdd.foreachPartition(new FinalResponseFunction(accum, bVars)); int maxBatchesVar = bVars.getMaxBatches(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java index 4ce0571..3178fef 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java @@ -36,7 +36,8 @@ public class SparkStreamingResponder implements ResponderPlugin private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class); @Override - public String getPlatformName() { + public String getPlatformName() + { return "sparkstreaming"; } @@ -45,7 +46,7 @@ public class SparkStreamingResponder implements ResponderPlugin { // For handling System.exit calls from Spark Streaming System.setSecurityManager(new SystemExitManager()); - + FileSystem fileSys; try { @@ -54,15 +55,14 @@ public class SparkStreamingResponder implements ResponderPlugin { throw new PIRException(e); } - + logger.info("Launching Spark ComputeStreamingResponse:"); ComputeStreamingResponse computeSR = null; try { computeSR = new ComputeStreamingResponse(fileSys); computeSR.performQuery(); - } - catch (SystemExitException e) + } catch (SystemExitException e) { // If System.exit(0) is not caught from Spark Streaming, // the application will complete with a 'failed' status @@ -70,8 +70,7 @@ public class SparkStreamingResponder implements ResponderPlugin } catch (IOException e) { throw new PIRException(e); - } - finally + } finally { // Teardown the context if (computeSR != null) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java index 912850a..26b24bd 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java @@ -30,11 +30,12 @@ import org.apache.pirk.utils.PIRException; public interface ResponderPlugin { /** - * Returns the plugin name for your framework - * This will be the platform argument + * Returns the plugin name for your framework This will be the platform argument + * * @return */ public String getPlatformName(); + /** * This method launches your framework responder. */ http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index 75cd292..cbfd182 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -205,8 +205,8 @@ public class Responder columns.put(i + rowCounter, column); - logger.debug("exp = " + exp + " i = " + i + " partition = " + hitValPartitions.get(i) + " = " + hitValPartitions.get(i).toString(2) + " column = " - + column); + logger.debug( + "exp = " + exp + " i = " + i + " partition = " + hitValPartitions.get(i) + " = " + hitValPartitions.get(i).toString(2) + " column = " + column); logger.debug("After: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter)); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java index 5214c5f..db33b1a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java @@ -35,7 +35,8 @@ public class StandaloneResponder implements ResponderPlugin private static final Logger logger = LoggerFactory.getLogger(StandaloneResponder.class); @Override - public String getPlatformName() { + public String getPlatformName() + { return "standalone"; } @@ -49,8 +50,7 @@ public class StandaloneResponder implements ResponderPlugin Query query = new LocalFileSystemStore().recall(queryInput, Query.class); Responder pirResponder = new Responder(query); pirResponder.computeStandaloneResponse(); - } - catch (IOException e) + } catch (IOException e) { logger.error("Error reading {}, {}", queryInput, e.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java index 90375aa..77edc24 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java @@ -121,8 +121,8 @@ public class EncColMultBolt extends BaseRichBolt @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer - .declareStream(StormConstants.ENCCOLMULTBOLT_ID, new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD)); + outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID, + new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD)); outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Fields("finished")); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java index 8a5b854..3c8fe1a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java @@ -157,8 +157,8 @@ public class EncRowCalcBolt extends BaseRichBolt @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, - StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT)); + outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, + new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT)); outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH)); } @@ -166,7 +166,8 @@ public class EncRowCalcBolt extends BaseRichBolt * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the * colIndexByRow and hitsByRow appropriately. * - * @param tuple {@code Tuple} + * @param tuple + * {@code Tuple} * @return {@code List<Tuple2>} */ private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java index fd23c7e..a479883 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java @@ -133,8 +133,7 @@ public class PirkTopology // b2.setMemoryLoad(5000); // b2.setCPULoad(150.0); - BoltDeclarer b3 = builder - .setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism) + BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism) .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT)) .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java index b988ccc..1f79993 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java @@ -31,6 +31,7 @@ public class StormResponder implements ResponderPlugin { private static final Logger logger = LoggerFactory.getLogger(StormResponder.class); + @Override public String getPlatformName() { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java index bbffaba..b96e71f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java @@ -43,9 +43,12 @@ public class StormUtils /** * Method to read in serialized Query object from the given queryFile * - * @param useHdfs - true or false - * @param hdfsUri - HDFS path - * @param queryFile - + * @param useHdfs + * - true or false + * @param hdfsUri + * - HDFS path + * @param queryFile + * - * @return {@link Query} */ public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index d1a4797..dfa33f9 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -71,9 +71,9 @@ public class DataSchemaLoader { private static final Logger logger = LoggerFactory.getLogger(DataSchemaLoader.class); - private static Set<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, - PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, - PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); + private static Set<String> allowedPrimitiveJavaTypes = new HashSet<>( + Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT, PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, + PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE, PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING)); static { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/query/QuerySchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java index cdc223e..6fa4dd5 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java @@ -20,6 +20,7 @@ package org.apache.pirk.schema.query; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -59,6 +60,9 @@ public class QuerySchema implements Serializable // Total number of bits to be returned for each data element hit. private final int dataElementSize; + // Additional fields by key,value + private final HashMap<String,String> additionalFields = new HashMap<>(); + QuerySchema(String schemaName, String dataSchemaName, String selectorName, String filterTypeName, DataFilter filter, int dataElementSize) { this.schemaName = schemaName; @@ -153,4 +157,27 @@ public class QuerySchema implements Serializable { return filter; } + + /** + * Returns the map of additional field keys and values + * <p> + * Note that additional fields are optional, thus the map may be empty + * + * @return The additionalFields HashMap + */ + public HashMap<String,String> getAdditionalFields() + { + return additionalFields; + } + + /** + * Returns the value from the additionalFields mapping corresponding to the given key + * + * @param key + * @return value from the additionalFields mapping corresponding to the given key + */ + public String getAdditionalFieldValue(String key) + { + return additionalFields.get(key); + } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java index c1b4139..949396b 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java @@ -21,6 +21,7 @@ package org.apache.pirk.schema.query; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; @@ -65,6 +66,12 @@ import org.xml.sax.SAXException; * <filterNames> (optional) * <name> element name of element in the data schema to apply pre-processing filters </name> * </filterNames> + * <additional> (optional) additional fields for the query schema, in <key,value> pairs + * <field> + * <key> key corresponding the the field </key> + * <value> value corresponding to the field </value> + * </field> + * </additional> * </schema> * } * </pre> @@ -118,6 +125,7 @@ public class QuerySchemaLoader String querySchemas = SystemConfiguration.getProperty("query.schemas", "none"); if (querySchemas.equals("none")) { + logger.info("query.schemas = none"); return; } @@ -260,10 +268,29 @@ public class QuerySchemaLoader Set<String> filteredNamesSet = extractFilteredElementNames(doc); DataFilter filter = instantiateFilter(filterTypeName, filteredNamesSet); + // Extract the additional fields, if they exists + HashMap<String,String> additionalFields = new HashMap<String,String>(); + if (doc.getElementsByTagName("additional").item(0) != null) + { + NodeList fieldList = doc.getElementsByTagName("field"); + int numFields = fieldList.getLength(); + if (numFields == 0) + { + throw new PIRException("numFields = " + numFields + " -- should be at least one"); + } + for (int i = 0; i < numFields; ++i) + { + Element fields = (Element) fieldList.item(i); + NodeList kv = fields.getChildNodes(); + additionalFields.put(getNodeValue("key", kv), getNodeValue("value", kv)); + } + } + // Create and return the query schema object. QuerySchema querySchema = new QuerySchema(schemaName, dataSchemaName, selectorName, filterTypeName, filter, dataElementSize); querySchema.getElementNames().addAll(elementNames); querySchema.getFilteredElementNames().addAll(filteredNamesSet); + querySchema.getAdditionalFields().putAll(additionalFields); return querySchema; } @@ -358,6 +385,30 @@ public class QuerySchemaLoader } /** + * Extracts the value corresponding to a given tag from the XML nodeList + * + * @param tagName + * The name of the tag for which to extract the value + * @param nodes + * The NodeList + * @return The given value + */ + private String getNodeValue(String tagName, NodeList nodes) + { + String value = ""; + + for (int x = 0; x < nodes.getLength(); x++) + { + Node node = nodes.item(x); + if (node.getNodeName().equals(tagName)) + { + value = node.getChildNodes().item(0).getNodeValue().trim(); + } + } + return value; + } + + /** * Instantiate the specified filter. * * Exceptions derive from call to the {@code getFilter} method of {@link FilterFactory} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index 7536d8f..bb2ed66 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -51,7 +51,8 @@ public class BaseTests public static final int dataPartitionBitSize = 8; // Selectors for domain and IP queries, queryIdentifier is the first entry for file generation - private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); + private static ArrayList<String> selectorsDomain = new ArrayList<>( + Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); private static ArrayList<String> selectorsIP = new ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", "13.14.15.16", "21.22.23.24")); // Encryption variables -- Paillier mechanisms are tested in the Paillier test code, so these are fixed... @@ -65,8 +66,7 @@ public class BaseTests testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive, false); } - public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) - throws Exception + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false, false); } @@ -276,8 +276,7 @@ public class BaseTests } // A query that returned an nxdomain response was made for the watched hostname; watched value type: hostname (String) - public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) - throws Exception + public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { logger.info("Running testDNSNXDOMAINQuery(): "); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index b6e7251..e107c56 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -460,16 +460,16 @@ public class Inputs TestUtils.createQuerySchema(DNS_IP_QUERY_FILE, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter); if (hdfs) { - TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter, - false, fs, hdfs); + TestUtils.createQuerySchema(DNS_IP_QUERY_FILE_HDFS, DNS_IP_QUERY, TEST_DATA_SCHEMA_NAME, IPS, dnsIPQueryElements, dnsIPQueryFilterElements, filter, false, + fs, hdfs); } // DNS_NXDOMAIN_QUERY List<String> dnsNXQueryElements = Arrays.asList(QNAME, SRCIP, DSTIP); List<String> dnsNXQueryFilterElements = Collections.singletonList(QNAME); - TestUtils - .createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, filter); + TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, + filter); if (hdfs) { TestUtils.createQuerySchema(DNS_NXDOMAIN_QUERY_FILE_HDFS, DNS_NXDOMAIN_QUERY, TEST_DATA_SCHEMA_NAME, QNAME, dnsNXQueryElements, dnsNXQueryFilterElements, @@ -480,8 +480,8 @@ public class Inputs List<String> dnsSrcIPQueryElements = Arrays.asList(QNAME, DSTIP, IPS); List<String> dnsSrcIPQueryFilterElements = Arrays.asList(SRCIP, IPS); - TestUtils - .createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, filter); + TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, + filter); if (hdfs) { TestUtils.createQuerySchema(DNS_SRCIP_QUERY_FILE_HDFS, DNS_SRCIP_QUERY, TEST_DATA_SCHEMA_NAME, SRCIP, dnsSrcIPQueryElements, dnsSrcIPQueryFilterElements, http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/test/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/TestUtils.java b/src/main/java/org/apache/pirk/test/utils/TestUtils.java index 05c9f28..9f62250 100644 --- a/src/main/java/org/apache/pirk/test/utils/TestUtils.java +++ b/src/main/java/org/apache/pirk/test/utils/TestUtils.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import javax.xml.parsers.DocumentBuilder; @@ -131,7 +132,7 @@ public class TestUtils public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput, List<String> elementNames, List<String> filterNames, String filter) throws IOException { - createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, true, null, false); + createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, true, null, false, null); } /** @@ -140,6 +141,16 @@ public class TestUtils public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput, List<String> elementNames, List<String> filterNames, String filter, boolean append, FileSystem fs, boolean hdfs) throws IOException { + createQuerySchema(schemaFile, querySchemaName, dataSchemaNameInput, selectorNameInput, elementNames, filterNames, filter, append, fs, hdfs, null); + } + + /** + * Creates the test query schema file + */ + public static void createQuerySchema(String schemaFile, String querySchemaName, String dataSchemaNameInput, String selectorNameInput, + List<String> elementNames, List<String> filterNames, String filter, boolean append, FileSystem fs, boolean hdfs, HashMap<String,String> additionalFields) + throws IOException + { logger.info("createQuerySchema: querySchemaName = " + querySchemaName); // Create a temporary file for the test schema, set in the properties @@ -176,6 +187,7 @@ public class TestUtils SystemConfiguration.setProperty("query.schemas", SystemConfiguration.getProperty("query.schemas", "") + "," + fileName); } } + logger.info("query.schemas = " + SystemConfiguration.getProperty("query.schemas")); // Write to the file @@ -234,6 +246,30 @@ public class TestUtils } } + // Add the additionalFields + if (additionalFields != null) + { + Element additionalElement = doc.createElement("additional"); + rootElement.appendChild(additionalElement); + + // Add the key,value pairs + for (String key : additionalFields.keySet()) + { + logger.info("Creating field element with key = " + key + " and value = " + additionalFields.get(key)); + + Element fieldElement = doc.createElement("field"); + additionalElement.appendChild(fieldElement); + + Element keyElement = doc.createElement("key"); + keyElement.appendChild(doc.createTextNode(key)); + fieldElement.appendChild(keyElement); + + Element valueElement = doc.createElement("value"); + valueElement.appendChild(doc.createTextNode(additionalFields.get(key))); + fieldElement.appendChild(valueElement); + } + } + // Write to a xml file TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = transformerFactory.newTransformer(); @@ -268,7 +304,8 @@ public class TestUtils /** * Converts the result file into an ArrayList of QueryResponseJSON objects * - * @throws IOException - {@link IOException} + * @throws IOException + * - {@link IOException} */ public static List<QueryResponseJSON> readResultsFile(File file) throws IOException { @@ -288,7 +325,9 @@ public class TestUtils /** * Write the ArrayList<String to a tmp file in the local filesystem with the given fileName - * @throws IOException - {@link IOException} + * + * @throws IOException + * - {@link IOException} */ public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/utils/QueryParserUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java index 617e6dc..f8df2c7 100644 --- a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java +++ b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java @@ -719,8 +719,8 @@ public class QueryParserUtils } else { - if (!((Integer.parseInt(blocksLower[ipBlock]) <= Integer.parseInt(ipValue[ipBlock])) && (Integer.parseInt(ipValue[ipBlock]) <= Integer - .parseInt(blocksUpper[ipBlock])))) + if (!((Integer.parseInt(blocksLower[ipBlock]) <= Integer.parseInt(ipValue[ipBlock])) + && (Integer.parseInt(ipValue[ipBlock]) <= Integer.parseInt(blocksUpper[ipBlock])))) { logger.info("IP block not within given range"); matches = false; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/java/org/apache/pirk/utils/SystemConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java index 4f2fa03..a5c27a9 100755 --- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java +++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java @@ -45,352 +45,353 @@ import org.slf4j.LoggerFactory; */ public class SystemConfiguration { - private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class); - - private static final Properties props = new Properties(); - - /** - * By default, these files should be found on the root of the classpath - */ - private static final String DEFAULT_PROPERTY_FILE = "pirk.properties"; - private static final String QUERIER_PROPERTIES_FILE = "querier.properties"; - private static final String RESPONDER_PROPERTIES_FILE = "responder.properties"; - - private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir"; - - static - { - initialize(); - } - - public static void initialize() - { - props.clear(); - - // First try to load the default properties file - loadPropsFromResource(DEFAULT_PROPERTY_FILE); - - // Try to load props from the querier and responder property files, if they exist - loadPropsFromResource(QUERIER_PROPERTIES_FILE); - loadPropsFromResource(RESPONDER_PROPERTIES_FILE); - - // Try to load the local properties files, if they exists - loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR)); - } - - /** - * Gets the specified property; returns <code>null</code> if the property isn't found. - * - * @param propertyName - * The name of the requested property. - * @return The value of the property, or <code>null</code> if the property cannot be found. - */ - public static String getProperty(String propertyName) - { - return props.getProperty(propertyName); - } - - /** - * Gets the specified property as a <code>String</code>, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested string property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - */ - public static String getProperty(String propertyName, String defaultValue) - { - return props.getProperty(propertyName, defaultValue); - } - - /** - * Gets the specified property as an <code>int</code>, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested int property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - * @throws NumberFormatException - * If the property does not contain a parsable <code>int</code> value. - */ - public static int getIntProperty(String propertyName, int defaultValue) - { - String value = props.getProperty(propertyName); - return (value == null) ? defaultValue : Integer.parseInt(value); - } - - /** - * Gets the specified property as an <code>long</code>, or the default value if the property isn't found. - * - * @param propertyName - * The name of the requested long property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return The value of the requested property, or the default value if the property is undefined. - * @throws NumberFormatException - * If the property does not contain a parsable <code>long</code> value. - */ - public static long getLongProperty(String propertyName, long defaultValue) - { - String value = props.getProperty(propertyName); - return (value == null) ? defaultValue : Long.parseLong(value); - } - - /** - * Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined. - * - * @param propertyName - * The name of the requested boolean property value. - * @param defaultValue - * The value to return if the property is undefined. - * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>defaultValue</code>. - */ - public static boolean getBooleanProperty(String propertyName, boolean defaultValue) - { - return (isSetTrue(propertyName)) || defaultValue; - } - - /** - * Returns <code>true</code> iff the specified boolean property value is "true". - * <p> - * If the property is not found, or it's value is not "true" then the method will return <code>false</code>. - * - * @param propertyName - * The name of the requested boolean property value. - * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>false</code>. - */ - public static boolean isSetTrue(String propertyName) - { - String value = props.getProperty(propertyName); - return "true".equals(value); - } - - /** - * Sets the property to the given value. - * <p> - * Any previous values stored at the same property name are replaced. - * - * @param propertyName - * The name of the property to set. - * @param value - * The property value. - */ - public static void setProperty(String propertyName, String value) - { - props.setProperty(propertyName, value); - } - - /** - * Returns true iff the given property name is defined. - * - * @param propertyName - * The property name to test. - * @return <code>true</code> if the property is found in the configuration, or <code>false</code> otherwise. - */ - public static boolean hasProperty(String propertyName) - { - return props.containsKey(propertyName); - } - - /** - * Appends a property via a comma separated list - * <p> - * If the property does not exist, it adds it. - * - * @param propertyName - * The property whose value is to be appended with the given value. - * @param value - * The value to be stored, or appended to the current value. - */ - public static void appendProperty(String propertyName, String value) - { - String oldValue = props.getProperty(propertyName); - - if (oldValue != null && !oldValue.equals("none")) - { - oldValue += "," + value; - } - else - { - oldValue = value; - } - props.setProperty(propertyName, oldValue); - } - - /** - * Loads the properties from local properties file in the specified directory. - * <p> - * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. - * - * @param dirName - * The directory to search for the new properties files. - */ - public static void loadPropsFromDir(String dirName) - { - logger.info("Loading properties from dirName = " + dirName); - File[] directoryListing = new File(dirName).listFiles(new FilenameFilter() - { - @Override - public boolean accept(File dir, String name) - { - return name.endsWith(".properties"); - } - }); - - if (directoryListing != null) - { - for (File file : directoryListing) - { - loadPropsFromFile(file); - } - } - } - - /** - * Loads the properties from local properties file in the specified directory in hdfs. - * <p> - * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. - * - * @param dirName - * The directory to search for the new properties files. - * @throws IOException - * @throws FileNotFoundException - */ - public static void loadPropsFromHDFSDir(String dirName, FileSystem fs) throws FileNotFoundException, IOException - { - logger.info("Loading properties from dirName = " + dirName); - - Path dirPath = new Path(dirName); - - FileStatus[] status = fs.listStatus(dirPath); - for (int i=0;i<status.length;i++) - { - if(status[i].getPath().getName().endsWith(".properties")) - { - loadPropsFromFile(status[i].getPath(), fs); - } - } - } - - /** - * Loads the properties from the specified file. - * <p> - * The new properties are added to the current system configuration. - * - * @param file - * The properties file containing the system properties to add. - */ - public static void loadPropsFromFile(File file) - { - if (file.exists()) - { - try (InputStream stream = new FileInputStream(file)) - { - logger.info("Loading properties file '" + file.getAbsolutePath() + "'"); - loadProperties(stream); - } catch (IOException e) - { - logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'"); - e.printStackTrace(); - } - } - else - { - logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'"); - } - } - - /** - * Loads the properties from the specified file in hdfs - * <p> - * The new properties are added to the current system configuration. - * - * @param file - * The properties file containing the system properties to add. - * @throws IOException - */ - public static void loadPropsFromFile(String filename, FileSystem fs) throws IOException - { - Path p = new Path(filename); - loadPropsFromFile(p, fs); - } - - /** - * Loads the properties from the specified file in hdfs - * <p> - * The new properties are added to the current system configuration. - * - * @param file - * The properties file containing the system properties to add. - * @throws IOException - */ - public static void loadPropsFromFile(Path filePath, FileSystem fs) throws IOException - { - if(fs.exists(filePath)) - { - try (InputStream stream = fs.open(filePath);) - { - logger.info("Loading properties file from hdfs'" + filePath.toString() + "'"); - loadProperties(stream); - } catch (IOException e) - { - logger.error("Problem loading properties file from hdfs '" + filePath.toString() + "'"); - e.printStackTrace(); - } - } - else - { - logger.warn("Properties file does not exist: '" + filePath.toString() + "'"); - } - } - - /** - * Loads the properties from the specified resource on the current classloader. - * <p> - * The new properties are added to the current system configuration. - * - * @param name - * The name of the resource defining the properties. - */ - public static void loadPropsFromResource(String name) - { - try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name)) - { - if (stream != null) - { - logger.info("Loading file '" + name + "'"); - loadProperties(stream); - } - else - { - logger.error("No file found '" + name + "'"); - } - } catch (IOException e) - { - logger.error("Problem loading file '" + name + "'"); - e.printStackTrace(); - } - } - - /** - * Load the properties in the Properties object and then trim any whitespace - * <p> - * Properties.load does not do this automatically - * @throws IOException - */ - public static void loadProperties(InputStream stream) throws IOException - { - props.load(stream); - - Enumeration propKeys = props.propertyNames(); - while(propKeys.hasMoreElements()) - { - String tmpKey = (String)propKeys.nextElement(); - String tmpValue = props.getProperty(tmpKey); - tmpValue = tmpValue.trim(); - props.put(tmpKey, tmpValue); - } - } + private static final Logger logger = LoggerFactory.getLogger(SystemConfiguration.class); + + private static final Properties props = new Properties(); + + /** + * By default, these files should be found on the root of the classpath + */ + private static final String DEFAULT_PROPERTY_FILE = "pirk.properties"; + private static final String QUERIER_PROPERTIES_FILE = "querier.properties"; + private static final String RESPONDER_PROPERTIES_FILE = "responder.properties"; + + private static final String LOCAL_PROPERTIES_DIR = "local.pirk.properties.dir"; + + static + { + initialize(); + } + + public static void initialize() + { + props.clear(); + + // First try to load the default properties file + loadPropsFromResource(DEFAULT_PROPERTY_FILE); + + // Try to load props from the querier and responder property files, if they exist + loadPropsFromResource(QUERIER_PROPERTIES_FILE); + loadPropsFromResource(RESPONDER_PROPERTIES_FILE); + + // Try to load the local properties files, if they exists + loadPropsFromDir(getProperty(LOCAL_PROPERTIES_DIR)); + } + + /** + * Gets the specified property; returns <code>null</code> if the property isn't found. + * + * @param propertyName + * The name of the requested property. + * @return The value of the property, or <code>null</code> if the property cannot be found. + */ + public static String getProperty(String propertyName) + { + return props.getProperty(propertyName); + } + + /** + * Gets the specified property as a <code>String</code>, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested string property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + */ + public static String getProperty(String propertyName, String defaultValue) + { + return props.getProperty(propertyName, defaultValue); + } + + /** + * Gets the specified property as an <code>int</code>, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested int property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + * @throws NumberFormatException + * If the property does not contain a parsable <code>int</code> value. + */ + public static int getIntProperty(String propertyName, int defaultValue) + { + String value = props.getProperty(propertyName); + return (value == null) ? defaultValue : Integer.parseInt(value); + } + + /** + * Gets the specified property as an <code>long</code>, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested long property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + * @throws NumberFormatException + * If the property does not contain a parsable <code>long</code> value. + */ + public static long getLongProperty(String propertyName, long defaultValue) + { + String value = props.getProperty(propertyName); + return (value == null) ? defaultValue : Long.parseLong(value); + } + + /** + * Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined. + * + * @param propertyName + * The name of the requested boolean property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>defaultValue</code>. + */ + public static boolean getBooleanProperty(String propertyName, boolean defaultValue) + { + return (isSetTrue(propertyName)) || defaultValue; + } + + /** + * Returns <code>true</code> iff the specified boolean property value is "true". + * <p> + * If the property is not found, or it's value is not "true" then the method will return <code>false</code>. + * + * @param propertyName + * The name of the requested boolean property value. + * @return <code>true</code> if the property is defined and has the value "true", otherwise <code>false</code>. + */ + public static boolean isSetTrue(String propertyName) + { + String value = props.getProperty(propertyName); + return "true".equals(value); + } + + /** + * Sets the property to the given value. + * <p> + * Any previous values stored at the same property name are replaced. + * + * @param propertyName + * The name of the property to set. + * @param value + * The property value. + */ + public static void setProperty(String propertyName, String value) + { + props.setProperty(propertyName, value); + } + + /** + * Returns true iff the given property name is defined. + * + * @param propertyName + * The property name to test. + * @return <code>true</code> if the property is found in the configuration, or <code>false</code> otherwise. + */ + public static boolean hasProperty(String propertyName) + { + return props.containsKey(propertyName); + } + + /** + * Appends a property via a comma separated list + * <p> + * If the property does not exist, it adds it. + * + * @param propertyName + * The property whose value is to be appended with the given value. + * @param value + * The value to be stored, or appended to the current value. + */ + public static void appendProperty(String propertyName, String value) + { + String oldValue = props.getProperty(propertyName); + + if (oldValue != null && !oldValue.equals("none")) + { + oldValue += "," + value; + } + else + { + oldValue = value; + } + props.setProperty(propertyName, oldValue); + } + + /** + * Loads the properties from local properties file in the specified directory. + * <p> + * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. + * + * @param dirName + * The directory to search for the new properties files. + */ + public static void loadPropsFromDir(String dirName) + { + logger.info("Loading properties from dirName = " + dirName); + File[] directoryListing = new File(dirName).listFiles(new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith(".properties"); + } + }); + + if (directoryListing != null) + { + for (File file : directoryListing) + { + loadPropsFromFile(file); + } + } + } + + /** + * Loads the properties from local properties file in the specified directory in hdfs. + * <p> + * All files ending in '.properties' will be loaded. The new properties are added to the current system configuration. + * + * @param dirName + * The directory to search for the new properties files. + * @throws IOException + * @throws FileNotFoundException + */ + public static void loadPropsFromHDFSDir(String dirName, FileSystem fs) throws FileNotFoundException, IOException + { + logger.info("Loading properties from dirName = " + dirName); + + Path dirPath = new Path(dirName); + + FileStatus[] status = fs.listStatus(dirPath); + for (int i = 0; i < status.length; i++) + { + if (status[i].getPath().getName().endsWith(".properties")) + { + loadPropsFromFile(status[i].getPath(), fs); + } + } + } + + /** + * Loads the properties from the specified file. + * <p> + * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + */ + public static void loadPropsFromFile(File file) + { + if (file.exists()) + { + try (InputStream stream = new FileInputStream(file)) + { + logger.info("Loading properties file '" + file.getAbsolutePath() + "'"); + loadProperties(stream); + } catch (IOException e) + { + logger.error("Problem loading properties file '" + file.getAbsolutePath() + "'"); + e.printStackTrace(); + } + } + else + { + logger.warn("Properties file does not exist: '" + file.getAbsolutePath() + "'"); + } + } + + /** + * Loads the properties from the specified file in hdfs + * <p> + * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + * @throws IOException + */ + public static void loadPropsFromFile(String filename, FileSystem fs) throws IOException + { + Path p = new Path(filename); + loadPropsFromFile(p, fs); + } + + /** + * Loads the properties from the specified file in hdfs + * <p> + * The new properties are added to the current system configuration. + * + * @param file + * The properties file containing the system properties to add. + * @throws IOException + */ + public static void loadPropsFromFile(Path filePath, FileSystem fs) throws IOException + { + if (fs.exists(filePath)) + { + try (InputStream stream = fs.open(filePath);) + { + logger.info("Loading properties file from hdfs'" + filePath.toString() + "'"); + loadProperties(stream); + } catch (IOException e) + { + logger.error("Problem loading properties file from hdfs '" + filePath.toString() + "'"); + e.printStackTrace(); + } + } + else + { + logger.warn("Properties file does not exist: '" + filePath.toString() + "'"); + } + } + + /** + * Loads the properties from the specified resource on the current classloader. + * <p> + * The new properties are added to the current system configuration. + * + * @param name + * The name of the resource defining the properties. + */ + public static void loadPropsFromResource(String name) + { + try (InputStream stream = SystemConfiguration.class.getClassLoader().getResourceAsStream(name)) + { + if (stream != null) + { + logger.info("Loading file '" + name + "'"); + loadProperties(stream); + } + else + { + logger.error("No file found '" + name + "'"); + } + } catch (IOException e) + { + logger.error("Problem loading file '" + name + "'"); + e.printStackTrace(); + } + } + + /** + * Load the properties in the Properties object and then trim any whitespace + * <p> + * Properties.load does not do this automatically + * + * @throws IOException + */ + public static void loadProperties(InputStream stream) throws IOException + { + props.load(stream); + + Enumeration propKeys = props.propertyNames(); + while (propKeys.hasMoreElements()) + { + String tmpKey = (String) propKeys.nextElement(); + String tmpValue = props.getProperty(tmpKey); + tmpValue = tmpValue.trim(); + props.put(tmpKey, tmpValue); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/resources/pirk.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties index a88c846..543b8b1 100755 --- a/src/main/resources/pirk.properties +++ b/src/main/resources/pirk.properties @@ -69,6 +69,12 @@ data.schemas = none ## <filterNames> ## <name> (optional) element name of element in the data schema to apply pre-processing filters </name> ## </filterNames> +## <additional> (optional) additional fields for the query schema, in <key,value> pairs +## <field> +## <key> key corresponding the the field </key> +## <value> value corresponding to the field </value> +## </field> +## </additional> ## </schema> ## ## http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/main/resources/query-schema.xsd ---------------------------------------------------------------------- diff --git a/src/main/resources/query-schema.xsd b/src/main/resources/query-schema.xsd index 65a36ce..db339b7 100644 --- a/src/main/resources/query-schema.xsd +++ b/src/main/resources/query-schema.xsd @@ -1,119 +1,158 @@ <?xml version="1.0" encoding="UTF-8" ?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> +<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with ~ + this work for additional information regarding copyright ownership. ~ The + ASF licenses this file to You under the Apache License, Version 2.0 ~ (the + "License"); you may not use this file except in compliance with ~ the License. + You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ ~ Unless required by applicable law or agreed to in writing, software ~ + distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the + License for the specific language governing permissions and ~ limitations + under the License. --> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" - targetNamespace="http://pirk.apache.org" xmlns="http://pirk.apache.org" - elementFormDefault="qualified"> + targetNamespace="http://pirk.apache.org" xmlns="http://pirk.apache.org" + elementFormDefault="qualified"> - <xs:element name="schema"> - <xs:complexType> - <xs:sequence> - <xs:element name="schemaName" type="xs:string"> - <xs:annotation> - <xs:documentation> - The name of the query schema. - The name omits leading and trailing - whitespace, and is case sensitive. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="schema"> + <xs:complexType> + <xs:sequence> + <xs:element name="schemaName" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of the query schema. + The name omits + leading and trailing + whitespace, and is case sensitive. + </xs:documentation> + </xs:annotation> + </xs:element> - <xs:element name="dataSchemaName" type="xs:string"> - <xs:annotation> - <xs:documentation> - The name of the data schema - over which this query is run. The name omits - leading and trailing whitespace, and is case - sensitive. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="dataSchemaName" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of the data schema + over which this query + is run. The name omits + leading and trailing whitespace, and is + case + sensitive. + </xs:documentation> + </xs:annotation> + </xs:element> - <xs:element name="selectorName" type="xs:string"> - <xs:annotation> - <xs:documentation>The name of the name of the - element in the data schema that will be the - selector for this query. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="selectorName" type="xs:string"> + <xs:annotation> + <xs:documentation>The name of the name of the + element in the data + schema that will be the + selector for this query. + </xs:documentation> + </xs:annotation> + </xs:element> - <xs:element name="elements"> - <xs:annotation> - <xs:documentation> - The set of element names to - include in the query response. - </xs:documentation> - </xs:annotation> - <xs:complexType> - <xs:sequence> + <xs:element name="elements"> + <xs:annotation> + <xs:documentation> + The set of element names to + include in the query + response. + </xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> - <xs:element name="name" type="xs:string" - maxOccurs="unbounded"> - <xs:annotation> - <xs:documentation> - The name of an - element in the data schema to - include in the query response. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="name" type="xs:string" maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + The name of an + element in the data schema to + include in the query response. + </xs:documentation> + </xs:annotation> + </xs:element> - </xs:sequence> - </xs:complexType> - </xs:element> + </xs:sequence> + </xs:complexType> + </xs:element> - <xs:element name="filter" type="xs:string" - minOccurs="0"> - <xs:annotation> - <xs:documentation> - The name of a class used to - filter the query response data. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="filter" type="xs:string" minOccurs="0"> + <xs:annotation> + <xs:documentation> + The name of a class used to + filter the query + response data. + </xs:documentation> + </xs:annotation> + </xs:element> - <xs:element name="filterNames" minOccurs="0" - maxOccurs="unbounded"> - <xs:annotation> - <xs:documentation> - The set of data element names - over which the - response filter is applied. - </xs:documentation> - </xs:annotation> - <xs:complexType> - <xs:sequence> + <xs:element name="filterNames" minOccurs="0" maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + The set of data element names + over which the + response filter is applied. + </xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> - <xs:element name="name" type="xs:string"> - <xs:annotation> - <xs:documentation> - The name of an - element in the data schema over - which to apply the filter. - </xs:documentation> - </xs:annotation> - </xs:element> + <xs:element name="name" type="xs:string"> + <xs:annotation> + <xs:documentation> + The name of an + element in the data schema over + which to apply the filter. + </xs:documentation> + </xs:annotation> + </xs:element> - </xs:sequence> - </xs:complexType> - </xs:element> - </xs:sequence> - </xs:complexType> - </xs:element> + </xs:sequence> + </xs:complexType> + </xs:element> + + <xs:element name="additional" minOccurs="0"> + <xs:annotation> + <xs:documentation> + Additional set of fields to include in the query + schema. + </xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> + + <xs:element name="field" maxOccurs="unbounded"> + <xs:annotation> + <xs:documentation> + Additional field to include in the query schema + </xs:documentation> + </xs:annotation> + <xs:complexType> + <xs:sequence> + <xs:element name="key" type="xs:string"> + <xs:annotation> + <xs:documentation> + The key corresponding to the the field + </xs:documentation> + </xs:annotation> + </xs:element> + <xs:element name="value" type="xs:string"> + <xs:annotation> + <xs:documentation> + The value corresponding to the field + </xs:documentation> + </xs:annotation> + </xs:element> + </xs:sequence> + </xs:complexType> + </xs:element> + + </xs:sequence> + </xs:complexType> + </xs:element> + + </xs:sequence> + </xs:complexType> + </xs:element> </xs:schema> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9d036d47/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java index bb70153..cab327e 100644 --- a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java +++ b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java @@ -326,7 +326,8 @@ public class QueryParserUtilsTest private void testBooleanQueryMapMapWritableWAW() { - assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema)); + assertTrue( + QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema)); assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:5+OR+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]", docWAW, dSchema)); assertTrue(QueryParserUtils.checkRecordWritableArrayWritable("?q=qtype:1+AND+rcode:0+AND+date:[2015-05-05T20:33:07.000Z+TO+2016-02-20T23:29:05.000Z]",
