PIRK-27 Improve Pirk's Use of Property Files - closes apache/incubator-pirk#27
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/1b38ea67 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/1b38ea67 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/1b38ea67 Branch: refs/heads/master Commit: 1b38ea673d47c8d1b1f6137f1406c2b43b618efe Parents: de7a9c8 Author: eawilliams <[email protected]> Authored: Thu Jul 28 22:19:18 2016 -0400 Committer: charris <[email protected]> Committed: Thu Jul 28 22:19:18 2016 -0400 ---------------------------------------------------------------------- .../pirk/querier/wideskies/QuerierDriver.java | 37 +- .../querier/wideskies/QuerierDriverCLI.java | 319 ++++----------- .../pirk/querier/wideskies/QuerierProps.java | 220 +++++++++++ .../pirk/responder/wideskies/ResponderCLI.java | 392 ++++--------------- .../responder/wideskies/ResponderDriver.java | 16 +- .../responder/wideskies/ResponderProps.java | 230 +++++++++++ .../wideskies/spark/ComputeResponse.java | 3 +- .../pirk/schema/data/DataSchemaLoader.java | 2 +- .../pirk/schema/query/QuerySchemaLoader.java | 2 +- .../distributed/testsuite/DistTestSuite.java | 46 +-- .../java/org/apache/pirk/test/utils/Inputs.java | 2 +- .../apache/pirk/utils/SystemConfiguration.java | 175 ++++++--- src/main/resources/pirk.properties | 7 +- src/main/resources/querier.properties | 123 ++++++ src/main/resources/responder.properties | 137 +++++++ 15 files changed, 1038 insertions(+), 673 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java index 44f2ba0..5e5c957 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java @@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory; public class QuerierDriver implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(QuerierDriver.class); public static void main(String... args) throws IOException, InterruptedException, PIRException @@ -104,26 +105,26 @@ public class QuerierDriver implements Serializable QuerierDriverCLI qdriverCLI = new QuerierDriverCLI(args); // Set the variables - action = qdriverCLI.getOptionValue(QuerierDriverCLI.ACTION); - inputFile = qdriverCLI.getOptionValue(QuerierDriverCLI.INPUTFILE); - outputFile = qdriverCLI.getOptionValue(QuerierDriverCLI.OUTPUTFILE); - numThreads = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.NUMTHREADS)); + action = SystemConfiguration.getProperty(QuerierProps.ACTION); + inputFile = SystemConfiguration.getProperty(QuerierProps.INPUTFILE); + outputFile = SystemConfiguration.getProperty(QuerierProps.OUTPUTFILE); + numThreads = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.NUMTHREADS)); if (action.equals("encrypt")) { - queryType = qdriverCLI.getOptionValue(QuerierDriverCLI.TYPE); - queryName = qdriverCLI.getOptionValue(QuerierDriverCLI.QUERYNAME); - hashBitSize = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.HASHBITSIZE)); - hashKey = qdriverCLI.getOptionValue(QuerierDriverCLI.HASHBITSIZE); - dataPartitionBitSize = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.DATAPARTITIONSIZE)); - paillierBitSize = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.PAILLIERBITSIZE)); - certainty = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.CERTAINTY)); - embedSelector = SystemConfiguration.getProperty(QuerierDriverCLI.EMBEDSELECTOR, "true").equals("true"); - useMemLookupTable = SystemConfiguration.getProperty(QuerierDriverCLI.USEMEMLOOKUPTABLE, "false").equals("true"); - useHDFSLookupTable = SystemConfiguration.getProperty(QuerierDriverCLI.USEHDFSLOOKUPTABLE, "false").equals("true"); - - if (qdriverCLI.hasOption(QuerierDriverCLI.BITSET)) + queryType = SystemConfiguration.getProperty(QuerierProps.QUERYTYPE); + queryName = SystemConfiguration.getProperty(QuerierProps.QUERYNAME); + hashBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE)); + hashKey = SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE); + dataPartitionBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.DATAPARTITIONSIZE)); + paillierBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.PAILLIERBITSIZE)); + certainty = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.CERTAINTY)); + embedSelector = SystemConfiguration.getProperty(QuerierProps.EMBEDSELECTOR, "true").equals("true"); + useMemLookupTable = SystemConfiguration.getProperty(QuerierProps.USEMEMLOOKUPTABLE, "false").equals("true"); + useHDFSLookupTable = SystemConfiguration.getProperty(QuerierProps.USEHDFSLOOKUPTABLE, "false").equals("true"); + + if (SystemConfiguration.hasProperty(QuerierProps.BITSET)) { - bitSet = Integer.parseInt(qdriverCLI.getOptionValue(QuerierDriverCLI.BITSET)); + bitSet = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.BITSET)); logger.info("bitSet = " + bitSet); } @@ -146,7 +147,7 @@ public class QuerierDriver implements Serializable } if (action.equals("decrypt")) { - querierFile = qdriverCLI.getOptionValue(QuerierDriverCLI.QUERIERFILE); + querierFile = SystemConfiguration.getProperty(QuerierProps.QUERIERFILE); } // Perform the action http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java index b6dc2a7..a5ea321 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java @@ -18,14 +18,14 @@ */ package org.apache.pirk.querier.wideskies; +import java.io.File; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.pirk.schema.data.DataSchemaLoader; -import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,32 +40,7 @@ public class QuerierDriverCLI private Options cliOptions = null; private CommandLine commandLine = null; - // General variables - public static final String ACTION = "action"; - public static final String INPUTFILE = "inputFile"; - public static final String OUTPUTFILE = "outputFile"; - public static final String TYPE = "queryType"; - public static final String NUMTHREADS = "numThreads"; - public static final String EMBEDQUERYSCHEMA = "embedQuerySchema"; - - // Encryption variables - public static final String HASHBITSIZE = "hashBitSize"; - public static final String HASHKEY = "hashKey"; - public static final String DATAPARTITIONSIZE = "dataPartitionBitSize"; - public static final String PAILLIERBITSIZE = "paillierBitSize"; - public static final String BITSET = "bitSet"; - public static final String CERTAINTY = "certainty"; - public static final String QUERYNAME = "queryName"; - public static final String QUERYSCHEMAS = "querySchemas"; - public static final String DATASCHEMAS = "dataSchemas"; - public static final String EMBEDSELECTOR = "embedSelector"; - public static final String USEMEMLOOKUPTABLE = "memLookupTable"; - public static final String USEHDFSLOOKUPTABLE = "useHDFSLookupTable"; - public static final String SR_ALGORITHM = "secureRandomAlg"; - public static final String SR_PROVIDER = "secureRandomProvider"; - - // Decryption variables - public static String QUERIERFILE = "querierFile"; + private static final String LOCALPROPFILE = "local.querier.properties"; /** * Create and parse allowable options @@ -136,192 +111,25 @@ public class QuerierDriverCLI { boolean valid = true; - // Parse general required options - if (!hasOption(ACTION)) - { - logger.info("Must have the option " + ACTION); - return false; - } - String action = getOptionValue(ACTION).toLowerCase(); - if (!action.equals("encrypt") && !action.equals("decrypt")) - { - logger.info("Unsupported action: " + action); - } - SystemConfiguration.setProperty(ACTION, action); - - if (!hasOption(INPUTFILE)) - { - logger.info("Must have the option " + INPUTFILE); - return false; - } - SystemConfiguration.setProperty(INPUTFILE, getOptionValue(INPUTFILE)); - - if (!hasOption(OUTPUTFILE)) - { - logger.info("Must have the option " + OUTPUTFILE); - return false; - } - SystemConfiguration.setProperty(OUTPUTFILE, getOptionValue(OUTPUTFILE)); - - if (!hasOption(NUMTHREADS)) - { - logger.info("Must have the option " + NUMTHREADS); - return false; - } - SystemConfiguration.setProperty(NUMTHREADS, getOptionValue(NUMTHREADS)); - - if (!hasOption(QUERYSCHEMAS)) - { - logger.info("Must have the option " + QUERYSCHEMAS); - return false; - } - SystemConfiguration.setProperty("query.schemas", getOptionValue(QUERYSCHEMAS)); - - if (!hasOption(DATASCHEMAS)) - { - logger.info("Must have the option " + DATASCHEMAS); - return false; - } - SystemConfiguration.setProperty("data.schemas", getOptionValue(DATASCHEMAS)); - - // Parse general optional args - if (hasOption(EMBEDQUERYSCHEMA)) + // If we have a local.querier.properties file specified, load it + if (hasOption(LOCALPROPFILE)) { - SystemConfiguration.setProperty("pir.embedQuerySchema", getOptionValue(EMBEDQUERYSCHEMA)); + SystemConfiguration.loadPropsFromFile(new File(getOptionValue(LOCALPROPFILE))); } else { - SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); - } - - // Parse encryption args - if (action.equals("encrypt")) - { - if (!hasOption(TYPE)) - { - logger.info("Must have the option " + TYPE); - return false; - } - SystemConfiguration.setProperty(TYPE, getOptionValue(TYPE)); - - if (!hasOption(HASHBITSIZE)) - { - logger.info("Must have the option " + HASHBITSIZE); - return false; - } - SystemConfiguration.setProperty(HASHBITSIZE, getOptionValue(HASHBITSIZE)); - - if (!hasOption(HASHKEY)) - { - logger.info("Must have the option " + HASHKEY); - return false; - } - SystemConfiguration.setProperty(HASHKEY, getOptionValue(HASHKEY)); - - if (!hasOption(DATAPARTITIONSIZE)) - { - logger.info("Must have the option " + DATAPARTITIONSIZE); - return false; - } - SystemConfiguration.setProperty(DATAPARTITIONSIZE, getOptionValue(DATAPARTITIONSIZE)); - - if (!hasOption(PAILLIERBITSIZE)) - { - logger.info("Must have the option " + PAILLIERBITSIZE); - return false; - } - SystemConfiguration.setProperty(PAILLIERBITSIZE, getOptionValue(PAILLIERBITSIZE)); - - if (!hasOption(CERTAINTY)) - { - logger.info("Must have the option " + CERTAINTY); - return false; - } - SystemConfiguration.setProperty(CERTAINTY, getOptionValue(CERTAINTY)); - - if (!hasOption(QUERYNAME)) + // Pull options, set as properties + for (String prop : QuerierProps.PROPSLIST) { - logger.info("Must have the option " + QUERYNAME); - return false; + if (hasOption(prop)) + { + SystemConfiguration.setProperty(prop, getOptionValue(prop)); + } } - SystemConfiguration.setProperty(QUERYNAME, getOptionValue(QUERYNAME)); - - if (!hasOption(BITSET)) - { - logger.info("Must have the option " + BITSET); - return false; - } - SystemConfiguration.setProperty(BITSET, getOptionValue(BITSET)); - - if (!hasOption(EMBEDSELECTOR)) - { - SystemConfiguration.setProperty(EMBEDSELECTOR, "true"); - } - else - { - SystemConfiguration.setProperty(EMBEDSELECTOR, getOptionValue(EMBEDSELECTOR)); - } - - if (!hasOption(USEMEMLOOKUPTABLE)) - { - SystemConfiguration.setProperty(USEMEMLOOKUPTABLE, "false"); - } - else - { - SystemConfiguration.setProperty(USEMEMLOOKUPTABLE, getOptionValue(USEMEMLOOKUPTABLE)); - } - - if (!hasOption(USEHDFSLOOKUPTABLE)) - { - SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false"); - } - else - { - SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, getOptionValue(USEHDFSLOOKUPTABLE)); - } - - if (!hasOption(SR_ALGORITHM)) - { - SystemConfiguration.setProperty("pallier.secureRandom.algorithm", "NativePRNG"); - } - else - { - SystemConfiguration.setProperty("pallier.secureRandom.algorithm", getOptionValue(SR_ALGORITHM)); - } - - if (!hasOption(SR_PROVIDER)) - { - SystemConfiguration.setProperty("pallier.secureRandom.provider", "SUN"); - } - else - { - SystemConfiguration.setProperty("pallier.secureRandom.provider", getOptionValue(SR_PROVIDER)); - } - } - - // Parse decryption args - if (action.equals("decrypt")) - { - if (!hasOption(QUERIERFILE)) - { - logger.info("Must have the option " + QUERIERFILE); - return false; - } - SystemConfiguration.setProperty(QUERIERFILE, QUERIERFILE); } - // Load the new local query and data schemas - logger.info("loading schemas: dataSchemas = " + SystemConfiguration.getProperty("data.schemas") + " querySchemas = " - + SystemConfiguration.getProperty("query.schemas")); - try - { - DataSchemaLoader.initialize(); - QuerySchemaLoader.initialize(); - - } catch (Exception e) - { - e.printStackTrace(); - } + // Validate properties + valid = QuerierProps.validateQuerierProperties(); return valid; } @@ -340,165 +148,174 @@ public class QuerierDriverCLI optionHelp.setRequired(false); options.addOption(optionHelp); + // local.querier.properties + Option optionLocalPropFile = new Option("localPropFile", LOCALPROPFILE, true, "Optional local properties file"); + optionLocalPropFile.setRequired(false); + optionLocalPropFile.setArgName(LOCALPROPFILE); + optionLocalPropFile.setType(String.class); + options.addOption(optionLocalPropFile); + // ACTION - Option optionACTION = new Option("a", ACTION, true, "required - 'encrypt' or 'decrypt' -- The action performed by the QuerierDriver"); + Option optionACTION = new Option("a", QuerierProps.ACTION, true, "required - 'encrypt' or 'decrypt' -- The action performed by the QuerierDriver"); optionACTION.setRequired(false); - optionACTION.setArgName(ACTION); + optionACTION.setArgName(QuerierProps.ACTION); optionACTION.setType(String.class); options.addOption(optionACTION); // INPUTFILE - Option optionINPUTFILE = new Option("i", INPUTFILE, true, "required - Fully qualified file containing input " + Option optionINPUTFILE = new Option("i", QuerierProps.INPUTFILE, true, "required - Fully qualified file containing input " + "-- \n The input is either: \n (1) For Encryption: A query file - Contains the query selectors, one per line; " + "the first line must be the query number \n OR \n (2) For Decryption: A response file - Contains the serialized Response object"); optionINPUTFILE.setRequired(false); - optionINPUTFILE.setArgName(INPUTFILE); + optionINPUTFILE.setArgName(QuerierProps.INPUTFILE); optionINPUTFILE.setType(String.class); options.addOption(optionINPUTFILE); // OUTPUTFILE - Option optionOUTPUTFILE = new Option("o", OUTPUTFILE, true, "required - Fully qualified file for the result output. " + Option optionOUTPUTFILE = new Option("o", QuerierProps.OUTPUTFILE, true, "required - Fully qualified file for the result output. " + "\n The output file specifies either: \n (1) For encryption: \n \t (a) A file to contain the serialized Querier object named: " + "<outputFile>-" + QuerierConst.QUERIER_FILETAG + " AND \n \t " + "(b) A file to contain the serialized Query object named: <outputFile>-" + QuerierConst.QUERY_FILETAG + "\n " + "OR \n (2) A file to contain the decryption results where each line is where each line " + "corresponds to one hit and is a JSON object with the schema QuerySchema"); optionOUTPUTFILE.setRequired(false); - optionOUTPUTFILE.setArgName(OUTPUTFILE); + optionOUTPUTFILE.setArgName(QuerierProps.OUTPUTFILE); optionOUTPUTFILE.setType(String.class); options.addOption(optionOUTPUTFILE); // NUMTHREADS - Option optionNUMTHREADS = new Option("nt", NUMTHREADS, true, "required -- Number of threads to use for encryption/decryption"); + Option optionNUMTHREADS = new Option("nt", QuerierProps.NUMTHREADS, true, "required -- Number of threads to use for encryption/decryption"); optionNUMTHREADS.setRequired(false); - optionNUMTHREADS.setArgName(NUMTHREADS); + optionNUMTHREADS.setArgName(QuerierProps.NUMTHREADS); optionNUMTHREADS.setType(String.class); options.addOption(optionNUMTHREADS); - // data.schemas - Option optionDataSchemas = new Option("ds", DATASCHEMAS, true, "required -- Comma separated list of data schema file names"); + // DATASCHEMAS + Option optionDataSchemas = new Option("ds", QuerierProps.DATASCHEMAS, true, "optional -- Comma separated list of data schema file names"); optionDataSchemas.setRequired(false); - optionDataSchemas.setArgName(DATASCHEMAS); + optionDataSchemas.setArgName(QuerierProps.DATASCHEMAS); optionDataSchemas.setType(String.class); options.addOption(optionDataSchemas); - // query.schemas - Option optionQuerySchemas = new Option("qs", QUERYSCHEMAS, true, "required -- Comma separated list of query schema file names"); + // QUERYSCHEMAS + Option optionQuerySchemas = new Option("qs", QuerierProps.QUERYSCHEMAS, true, "optional -- Comma separated list of query schema file names"); optionQuerySchemas.setRequired(false); - optionQuerySchemas.setArgName(QUERYSCHEMAS); + optionQuerySchemas.setArgName(QuerierProps.QUERYSCHEMAS); optionQuerySchemas.setType(String.class); options.addOption(optionQuerySchemas); // TYPE - Option optionTYPE = new Option("qt", TYPE, true, "required for encryption -- Type of the query as defined " + Option optionTYPE = new Option("qt", QuerierProps.QUERYTYPE, true, "required for encryption -- Type of the query as defined " + "in the 'schemaName' tag of the corresponding query schema file"); optionTYPE.setRequired(false); - optionTYPE.setArgName(TYPE); + optionTYPE.setArgName(QuerierProps.QUERYTYPE); optionTYPE.setType(String.class); options.addOption(optionTYPE); // NAME - Option optionNAME = new Option("qn", QUERYNAME, true, "required for encryption -- Name of the query"); + Option optionNAME = new Option("qn", QuerierProps.QUERYNAME, true, "required for encryption -- Name of the query"); optionNAME.setRequired(false); - optionNAME.setArgName(QUERYNAME); + optionNAME.setArgName(QuerierProps.QUERYNAME); optionNAME.setType(String.class); options.addOption(optionNAME); // HASHBITSIZE - Option optionHASHBITSIZE = new Option("hb", HASHBITSIZE, true, "required -- Bit size of keyed hash"); + Option optionHASHBITSIZE = new Option("hb", QuerierProps.HASHBITSIZE, true, "required -- Bit size of keyed hash"); optionHASHBITSIZE.setRequired(false); - optionHASHBITSIZE.setArgName(HASHBITSIZE); + optionHASHBITSIZE.setArgName(QuerierProps.HASHBITSIZE); optionHASHBITSIZE.setType(String.class); options.addOption(optionHASHBITSIZE); // HASHKEY - Option optionHASHKEY = new Option("hk", HASHKEY, true, "required for encryption -- String key for the keyed hash functionality"); + Option optionHASHKEY = new Option("hk", QuerierProps.HASHKEY, true, "required for encryption -- String key for the keyed hash functionality"); optionHASHKEY.setRequired(false); - optionHASHKEY.setArgName(HASHKEY); + optionHASHKEY.setArgName(QuerierProps.HASHKEY); optionHASHKEY.setType(String.class); options.addOption(optionHASHKEY); // DATAPARTITIONSIZE - Option optionDATAPARTITIONSIZE = new Option("dps", DATAPARTITIONSIZE, true, "required for encryption -- Partition bit size in data partitioning"); + Option optionDATAPARTITIONSIZE = new Option("dps", QuerierProps.DATAPARTITIONSIZE, true, + "required for encryption -- Partition bit size in data partitioning"); optionDATAPARTITIONSIZE.setRequired(false); - optionDATAPARTITIONSIZE.setArgName(DATAPARTITIONSIZE); + optionDATAPARTITIONSIZE.setArgName(QuerierProps.DATAPARTITIONSIZE); optionDATAPARTITIONSIZE.setType(String.class); options.addOption(optionDATAPARTITIONSIZE); // PAILLIERBITSIZE - Option optionPAILLIERBITSIZE = new Option("pbs", PAILLIERBITSIZE, true, "required for encryption -- Paillier modulus size N"); + Option optionPAILLIERBITSIZE = new Option("pbs", QuerierProps.PAILLIERBITSIZE, true, "required for encryption -- Paillier modulus size N"); optionPAILLIERBITSIZE.setRequired(false); - optionPAILLIERBITSIZE.setArgName(PAILLIERBITSIZE); + optionPAILLIERBITSIZE.setArgName(QuerierProps.PAILLIERBITSIZE); optionPAILLIERBITSIZE.setType(String.class); options.addOption(optionPAILLIERBITSIZE); // CERTAINTY - Option optionCERTAINTY = new Option("c", CERTAINTY, true, + Option optionCERTAINTY = new Option("c", QuerierProps.CERTAINTY, true, "required for encryption -- Certainty of prime generation for Paillier -- must be greater than or " + "equal to " + SystemConfiguration.getProperty("pir.primeCertainty") + ""); optionCERTAINTY.setRequired(false); - optionCERTAINTY.setArgName(CERTAINTY); + optionCERTAINTY.setArgName(QuerierProps.CERTAINTY); optionCERTAINTY.setType(String.class); options.addOption(optionCERTAINTY); // BITSET - Option optionBITSET = new Option("b", BITSET, true, "required for encryption -- Ensure that this bit position is set in the " + Option optionBITSET = new Option("b", QuerierProps.BITSET, true, "required for encryption -- Ensure that this bit position is set in the " + "Paillier modulus (will generate Paillier moduli until finding one in which this bit is set)"); optionBITSET.setRequired(false); - optionBITSET.setArgName(BITSET); + optionBITSET.setArgName(QuerierProps.BITSET); optionBITSET.setType(String.class); options.addOption(optionBITSET); // embedSelector - Option optionEmbedSelector = new Option("embed", EMBEDSELECTOR, true, "required for encryption -- 'true' or 'false' - Whether or not to embed " - + "the selector in the results to reduce false positives"); + Option optionEmbedSelector = new Option("embed", QuerierProps.EMBEDSELECTOR, true, + "required for encryption -- 'true' or 'false' - Whether or not to embed " + "the selector in the results to reduce false positives"); optionEmbedSelector.setRequired(false); - optionEmbedSelector.setArgName(EMBEDSELECTOR); + optionEmbedSelector.setArgName(QuerierProps.EMBEDSELECTOR); optionEmbedSelector.setType(String.class); options.addOption(optionEmbedSelector); // useMemLookupTable - Option optionUseMemLookupTable = new Option("mlu", USEMEMLOOKUPTABLE, true, + Option optionUseMemLookupTable = new Option("mlu", QuerierProps.USEMEMLOOKUPTABLE, true, "required for encryption -- 'true' or 'false' - Whether or not to generate and use " + "an in memory modular exponentation lookup table - only for standalone/testing right now..."); optionUseMemLookupTable.setRequired(false); - optionUseMemLookupTable.setArgName(USEMEMLOOKUPTABLE); + optionUseMemLookupTable.setArgName(QuerierProps.USEMEMLOOKUPTABLE); optionUseMemLookupTable.setType(String.class); options.addOption(optionUseMemLookupTable); // useHDFSLookupTable - Option optionUseHDFSLookupTable = new Option("lu", USEHDFSLOOKUPTABLE, true, + Option optionUseHDFSLookupTable = new Option("lu", QuerierProps.USEHDFSLOOKUPTABLE, true, "required for encryption -- 'true' or 'false' -- Whether or not to generate and use " + "a hdfs modular exponentation lookup table"); optionUseHDFSLookupTable.setRequired(false); - optionUseHDFSLookupTable.setArgName(USEHDFSLOOKUPTABLE); + optionUseHDFSLookupTable.setArgName(QuerierProps.USEHDFSLOOKUPTABLE); optionUseHDFSLookupTable.setType(String.class); options.addOption(optionUseHDFSLookupTable); // QUERIERFILE - Option optionQUERIERFILE = new Option("qf", QUERIERFILE, true, "required for decryption -- Fully qualified file containing the serialized Querier object"); + Option optionQUERIERFILE = new Option("qf", QuerierProps.QUERIERFILE, true, + "required for decryption -- Fully qualified file containing the serialized Querier object"); optionQUERIERFILE.setRequired(false); - optionQUERIERFILE.setArgName(QUERIERFILE); + optionQUERIERFILE.setArgName(QuerierProps.QUERIERFILE); optionQUERIERFILE.setType(String.class); options.addOption(optionQUERIERFILE); // embedQuerySchema - Option optionEMBEDQUERYSCHEMA = new Option("embedQS", EMBEDQUERYSCHEMA, true, + Option optionEMBEDQUERYSCHEMA = new Option("embedQS", QuerierProps.EMBEDQUERYSCHEMA, true, "optional (defaults to false) -- Whether or not to embed the QuerySchema in the Query (via QueryInfo)"); optionEMBEDQUERYSCHEMA.setRequired(false); - optionEMBEDQUERYSCHEMA.setArgName(EMBEDQUERYSCHEMA); + optionEMBEDQUERYSCHEMA.setArgName(QuerierProps.EMBEDQUERYSCHEMA); optionEMBEDQUERYSCHEMA.setType(String.class); options.addOption(optionEMBEDQUERYSCHEMA); // SR_ALGORITHM - Option optionSR_ALGORITHM = new Option("srAlg", SR_ALGORITHM, true, "optional - specify the SecureRandom algorithm, defaults to NativePRNG"); + Option optionSR_ALGORITHM = new Option("srAlg", QuerierProps.SR_ALGORITHM, true, "optional - specify the SecureRandom algorithm, defaults to NativePRNG"); optionSR_ALGORITHM.setRequired(false); - optionSR_ALGORITHM.setArgName(SR_ALGORITHM); + optionSR_ALGORITHM.setArgName(QuerierProps.SR_ALGORITHM); optionSR_ALGORITHM.setType(String.class); options.addOption(optionSR_ALGORITHM); // SR_PROVIDERS - Option optionSR_PROVIDER = new Option("srProvider", SR_PROVIDER, true, "optional - specify the SecureRandom provider, defaults to SUN"); + Option optionSR_PROVIDER = new Option("srProvider", QuerierProps.SR_PROVIDER, true, "optional - specify the SecureRandom provider, defaults to SUN"); optionSR_PROVIDER.setRequired(false); - optionSR_PROVIDER.setArgName(SR_PROVIDER); + optionSR_PROVIDER.setArgName(QuerierProps.SR_PROVIDER); optionSR_PROVIDER.setType(String.class); options.addOption(optionSR_PROVIDER); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/querier/wideskies/QuerierProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierProps.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierProps.java new file mode 100644 index 0000000..e8820af --- /dev/null +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierProps.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.querier.wideskies; + +import java.util.Arrays; +import java.util.List; + +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Properties constants and validation for the Querier + */ +public class QuerierProps +{ + private static final Logger logger = LoggerFactory.getLogger(QuerierProps.class); + + // General properties + public static final String ACTION = "querier.action"; + public static final String INPUTFILE = "querier.inputFile"; + public static final String OUTPUTFILE = "querier.outputFile"; + public static final String QUERYTYPE = "querier.queryType"; + public static final String NUMTHREADS = "querier.numThreads"; + + // Encryption properties + public static final String HASHBITSIZE = "querier.hashBitSize"; + public static final String HASHKEY = "querier.hashKey"; + public static final String DATAPARTITIONSIZE = "querier.dataPartitionBitSize"; + public static final String PAILLIERBITSIZE = "querier.paillierBitSize"; + public static final String BITSET = "querier.bitSet"; + public static final String CERTAINTY = "querier.certainty"; + public static final String QUERYNAME = "querier.queryName"; + public static final String QUERYSCHEMAS = "querier.querySchemas"; + public static final String DATASCHEMAS = "querier.dataSchemas"; + public static final String EMBEDSELECTOR = "querier.embedSelector"; + public static final String USEMEMLOOKUPTABLE = "querier.memLookupTable"; + public static final String USEHDFSLOOKUPTABLE = "querier.useHDFSLookupTable"; + public static final String SR_ALGORITHM = "pallier.secureRandom.algorithm"; + public static final String SR_PROVIDER = "pallier.secureRandom.provider"; + public static final String EMBEDQUERYSCHEMA = "pir.embedQuerySchema"; + + // Decryption properties + public static final String QUERIERFILE = "querier.querierFile"; + + public static final List<String> PROPSLIST = Arrays.asList(ACTION, INPUTFILE, OUTPUTFILE, QUERYTYPE, NUMTHREADS, EMBEDQUERYSCHEMA, HASHBITSIZE, HASHKEY, + DATAPARTITIONSIZE, PAILLIERBITSIZE, BITSET, CERTAINTY, QUERYNAME, QUERYSCHEMAS, DATASCHEMAS, EMBEDSELECTOR, USEMEMLOOKUPTABLE, USEHDFSLOOKUPTABLE, + SR_ALGORITHM, SR_PROVIDER); + + /** + * Validates the querier properties + * + */ + public static boolean validateQuerierProperties() + { + boolean valid = true; + + // Parse general required properties + + if (!SystemConfiguration.hasProperty(ACTION)) + { + logger.info("Must have the option " + ACTION); + valid = false; + } + String action = SystemConfiguration.getProperty(ACTION).toLowerCase(); + if (!action.equals("encrypt") && !action.equals("decrypt")) + { + logger.info("Unsupported action: " + action); + valid = false; + } + + if (!SystemConfiguration.hasProperty(INPUTFILE)) + { + logger.info("Must have the option " + INPUTFILE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(OUTPUTFILE)) + { + logger.info("Must have the option " + OUTPUTFILE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(NUMTHREADS)) + { + logger.info("Must have the option " + NUMTHREADS); + valid = false; + } + + // Parse general optional properties + if (!SystemConfiguration.hasProperty(EMBEDQUERYSCHEMA)) + { + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + } + + // Parse encryption properties + + if (action.equals("encrypt")) + { + if (!SystemConfiguration.hasProperty(QUERYTYPE)) + { + logger.info("For action='encrypt': Must have the option " + QUERYTYPE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(HASHBITSIZE)) + { + logger.info("For action='encrypt': Must have the option " + HASHBITSIZE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(HASHKEY)) + { + logger.info("For action='encrypt': Must have the option " + HASHKEY); + valid = false; + } + + if (!SystemConfiguration.hasProperty(DATAPARTITIONSIZE)) + { + logger.info("For action='encrypt': Must have the option " + DATAPARTITIONSIZE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(PAILLIERBITSIZE)) + { + logger.info("For action='encrypt': Must have the option " + PAILLIERBITSIZE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(CERTAINTY)) + { + logger.info("For action='encrypt': Must have the option " + CERTAINTY); + valid = false; + } + + if (!SystemConfiguration.hasProperty(QUERYNAME)) + { + logger.info("For action='encrypt': Must have the option " + QUERYNAME); + valid = false; + } + + if (!SystemConfiguration.hasProperty(BITSET)) + { + logger.info("For action='encrypt': Must have the option " + BITSET); + valid = false; + } + + if (SystemConfiguration.hasProperty(QUERYSCHEMAS)) + { + SystemConfiguration.appendProperty("query.schemas", SystemConfiguration.getProperty(QUERYSCHEMAS)); + } + + if (SystemConfiguration.hasProperty(DATASCHEMAS)) + { + SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS)); + } + + if (!SystemConfiguration.hasProperty(EMBEDSELECTOR)) + { + SystemConfiguration.setProperty(EMBEDSELECTOR, "true"); + } + + if (!SystemConfiguration.hasProperty(USEMEMLOOKUPTABLE)) + { + SystemConfiguration.setProperty(USEMEMLOOKUPTABLE, "false"); + } + + if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE)) + { + SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false"); + } + } + + // Parse decryption args + if (action.equals("decrypt")) + { + if (!SystemConfiguration.hasProperty(QUERIERFILE)) + { + logger.info("For action='decrypt': Must have the option " + QUERIERFILE); + valid = false; + } + } + + // Load the new local query and data schemas + if (valid) + { + logger.info("loading schemas: dataSchemas = " + SystemConfiguration.getProperty("data.schemas") + " querySchemas = " + + SystemConfiguration.getProperty("query.schemas")); + try + { + DataSchemaLoader.initialize(); + QuerySchemaLoader.initialize(); + + } catch (Exception e) + { + e.printStackTrace(); + } + } + + return valid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index d38867e..6a92f63 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -18,15 +18,14 @@ */ package org.apache.pirk.responder.wideskies; +import java.io.File; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.pirk.inputformat.hadoop.InputFormatConst; -import org.apache.pirk.schema.data.DataSchemaLoader; -import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,36 +40,7 @@ public class ResponderCLI private Options cliOptions = null; private CommandLine commandLine = null; - // Required args - public static final String PLATFORM = "platform"; - public static final String QUERYINPUT = "queryInput"; - public static final String DATAINPUTFORMAT = "dataInputFormat"; - public static final String INPUTDATA = "inputData"; - public static final String BASEQUERY = "baseQuery"; - public static final String ESRESOURCE = "esResource"; - public static final String ESQUERY = "esQuery"; - public static final String OUTPUTFILE = "outputFile"; - - // Optional args - public static final String BASEINPUTFORMAT = "baseInputFormat"; - public static final String STOPLISTFILE = "stopListFile"; - private static final String NUMREDUCETASKS = "numReduceTasks"; - public static final String USELOCALCACHE = "useLocalCache"; - public static final String LIMITHITSPERSELECTOR = "limitHitsPerSelector"; - public static final String MAXHITSPERSELECTOR = "maxHitsPerSelector"; - private static final String MAPMEMORY = "mapreduceMapMemoryMb"; - private static final String REDUCEMEMORY = "mapreduceReduceMemoryMb"; - private static final String MAPJAVAOPTS = "mapreduceMapJavaOpts"; - private static final String REDUCEJAVAOPTS = "mapreduceReduceJavaOpts"; - public static final String QUERYSCHEMAS = "querySchemas"; - public static final String DATASCHEMAS = "dataSchemas"; - public static final String NUMEXPLOOKUPPARTS = "numExpLookupPartitions"; - private static final String USEHDFSLOOKUPTABLE = "useHDFSLookupTable"; - private static final String NUMDATAPARTITIONS = "numDataPartitions"; - public static final String NUMCOLMULTPARTITIONS = "numColMultPartitions"; - public static final String USEMODEXPJOIN = "useModExpJoin"; - public static final String COLMULTREDUCEBYKEY = "colMultReduceByKey"; - public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "allowAdHocQuerySchemas"; + private static final String LOCALPROPFILE = "local.responder.properties"; /** * Create and parse allowable options @@ -141,228 +111,25 @@ public class ResponderCLI { boolean valid = true; - // Parse general required options - if (!hasOption(PLATFORM)) - { - logger.info("Must have the option " + PLATFORM); - return false; - } - String platform = getOptionValue(PLATFORM).toLowerCase(); - if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone")) - { - logger.info("Unsupported platform: " + platform); - return false; - } - SystemConfiguration.setProperty("platform", getOptionValue(PLATFORM)); - - if (!hasOption(QUERYINPUT)) - { - logger.info("Must have the option " + QUERYINPUT); - return false; - } - SystemConfiguration.setProperty("pir.queryInput", getOptionValue(QUERYINPUT)); - - if (!hasOption(OUTPUTFILE)) - { - logger.info("Must have the option " + OUTPUTFILE); - return false; - } - SystemConfiguration.setProperty("pir.outputFile", getOptionValue(OUTPUTFILE)); - - if (!hasOption(QUERYSCHEMAS)) - { - logger.info("Must have the option " + QUERYSCHEMAS); - return false; - } - SystemConfiguration.setProperty("query.schemas", getOptionValue(QUERYSCHEMAS)); - - if (!hasOption(DATASCHEMAS)) + // If we have a local.querier.properties file specified, load it + if (hasOption(LOCALPROPFILE)) { - logger.info("Must have the option " + DATASCHEMAS); - return false; + SystemConfiguration.loadPropsFromFile(new File(getOptionValue(LOCALPROPFILE))); } - SystemConfiguration.setProperty("data.schemas", getOptionValue(DATASCHEMAS)); - - if (!hasOption(DATAINPUTFORMAT)) - { - logger.info("Must have the option " + DATAINPUTFORMAT); - return false; - } - String dataInputFormat = getOptionValue(DATAINPUTFORMAT).toLowerCase(); - SystemConfiguration.setProperty("pir.dataInputFormat", dataInputFormat); - - // Parse required options by dataInputFormat - if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) - { - if (!hasOption(BASEINPUTFORMAT)) - { - logger.info("Must have the option " + BASEINPUTFORMAT + " if using " + InputFormatConst.BASE_FORMAT); - return false; - } - SystemConfiguration.setProperty("pir.baseInputFormat", getOptionValue(BASEINPUTFORMAT)); - - if (!hasOption(INPUTDATA)) - { - logger.info("Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); - return false; - } - SystemConfiguration.setProperty("pir.inputData", getOptionValue(INPUTDATA)); - - if (hasOption(BASEQUERY)) - { - SystemConfiguration.setProperty("pir.baseQuery", getOptionValue(BASEQUERY)); - } - else - { - SystemConfiguration.setProperty("pir.baseQuery", "?q=*"); - } - } - else if (dataInputFormat.equals(InputFormatConst.ES)) - { - if (!hasOption(ESRESOURCE)) - { - logger.info("Must have the option " + ESRESOURCE); - return false; - } - SystemConfiguration.setProperty("pir.esResource", getOptionValue(ESRESOURCE)); - - if (!hasOption(ESQUERY)) - { - logger.info("Must have the option " + ESQUERY); - return false; - } - SystemConfiguration.setProperty("pir.esQuery", getOptionValue(ESQUERY)); - } - else if (dataInputFormat.equalsIgnoreCase("standalone")) + else { - if (!hasOption(INPUTDATA)) + // Pull options, set as properties + for (String prop : ResponderProps.PROPSLIST) { - logger.info("Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); - return false; + if (hasOption(prop)) + { + SystemConfiguration.setProperty(prop, getOptionValue(prop)); + } } - SystemConfiguration.setProperty("pir.inputData", getOptionValue(INPUTDATA)); - } - else - { - logger.info("Unsupported inputFormat = " + dataInputFormat); - return false; - } - - // Parse optional args - if (hasOption(STOPLISTFILE)) - { - SystemConfiguration.setProperty("pir.stopListFile", getOptionValue(STOPLISTFILE)); } - if (hasOption(NUMREDUCETASKS)) - { - SystemConfiguration.setProperty("pir.numReduceTasks", getOptionValue(NUMREDUCETASKS)); - } - - if (hasOption(USELOCALCACHE)) - { - SystemConfiguration.setProperty("pir.useLocalCache", getOptionValue(USELOCALCACHE)); - } - - if (hasOption(LIMITHITSPERSELECTOR)) - { - SystemConfiguration.setProperty("pir.limitHitsPerSelector", getOptionValue(LIMITHITSPERSELECTOR)); - } - - if (hasOption(MAXHITSPERSELECTOR)) - { - SystemConfiguration.setProperty("pir.maxHitsPerSelector", getOptionValue(MAXHITSPERSELECTOR)); - } - - if (hasOption(MAPMEMORY)) - { - SystemConfiguration.setProperty("mapreduce.map.memory.mb", getOptionValue(MAPMEMORY)); - } - - if (hasOption(REDUCEMEMORY)) - { - SystemConfiguration.setProperty("mapreduce.reduce.memory.mb", getOptionValue(REDUCEMEMORY)); - } - - if (hasOption(MAPJAVAOPTS)) - { - SystemConfiguration.setProperty("mapreduce.map.java.opts", getOptionValue(MAPJAVAOPTS)); - } - - if (hasOption(REDUCEJAVAOPTS)) - { - SystemConfiguration.setProperty("mapreduce.reduce.java.opts", getOptionValue(REDUCEJAVAOPTS)); - } - - if (hasOption(NUMEXPLOOKUPPARTS)) - { - SystemConfiguration.setProperty("pir.numExpLookupPartitions", getOptionValue(NUMEXPLOOKUPPARTS)); - } - - if (hasOption(USEHDFSLOOKUPTABLE)) - { - SystemConfiguration.setProperty("pir.useHDFSLookupTable", getOptionValue(USEHDFSLOOKUPTABLE)); - } - else - { - SystemConfiguration.setProperty("pir.useHDFSLookupTable", "false"); - } - - if (hasOption(USEMODEXPJOIN)) - { - SystemConfiguration.setProperty("pir.useModExpJoin", getOptionValue(USEMODEXPJOIN)); - } - else - { - SystemConfiguration.setProperty("pir.useModExpJoin", "false"); - } - - if (hasOption(NUMDATAPARTITIONS)) - { - SystemConfiguration.setProperty("pir.numDataPartitions", getOptionValue(NUMDATAPARTITIONS)); - } - else - { - SystemConfiguration.setProperty("pir.numDataPartitions", "1000"); - } - - if (hasOption(NUMCOLMULTPARTITIONS)) - { - SystemConfiguration.setProperty("pir.numColMultPartitions", getOptionValue(NUMCOLMULTPARTITIONS)); - } - else - { - SystemConfiguration.setProperty("pir.numColMultPartitions", "1000"); - } - - if (hasOption(COLMULTREDUCEBYKEY)) - { - SystemConfiguration.setProperty("pir.colMultReduceByKey", getOptionValue(COLMULTREDUCEBYKEY)); - } - else - { - SystemConfiguration.setProperty("pir.colMultReduceByKey", "false"); - } - - if (hasOption(ALLOWEMBEDDEDQUERYSCHEMAS)) - { - SystemConfiguration.setProperty("pir.allowEmbeddedQuerySchemas", getOptionValue(ALLOWEMBEDDEDQUERYSCHEMAS)); - } - else - { - SystemConfiguration.setProperty("pir.allowEmbeddedQuerySchemas", "false"); - } - - // Load the new local query and data schemas - try - { - DataSchemaLoader.initialize(); - QuerySchemaLoader.initialize(); - - } catch (Exception e) - { - e.printStackTrace(); - } + // Validate properties + valid = ResponderProps.validateResponderProperties(); return valid; } @@ -381,205 +148,218 @@ public class ResponderCLI optionHelp.setRequired(false); options.addOption(optionHelp); + // local.querier.properties + Option optionLocalPropFile = new Option("localPropFile", LOCALPROPFILE, true, "Optional local properties file"); + optionLocalPropFile.setRequired(false); + optionLocalPropFile.setArgName(LOCALPROPFILE); + optionLocalPropFile.setType(String.class); + options.addOption(optionLocalPropFile); + // platform - Option optionPlatform = new Option("p", PLATFORM, true, + Option optionPlatform = new Option("p", ResponderProps.PLATFORM, true, "required -- 'mapreduce', 'spark', or 'standalone' : Processing platform technology for the responder"); optionPlatform.setRequired(false); - optionPlatform.setArgName(PLATFORM); + optionPlatform.setArgName(ResponderProps.PLATFORM); optionPlatform.setType(String.class); options.addOption(optionPlatform); // queryInput - Option optionQueryInput = new Option("q", QUERYINPUT, true, "required -- Fully qualified dir in hdfs of Query files"); + Option optionQueryInput = new Option("q", ResponderProps.QUERYINPUT, true, "required -- Fully qualified dir in hdfs of Query files"); optionQueryInput.setRequired(false); - optionQueryInput.setArgName(QUERYINPUT); + optionQueryInput.setArgName(ResponderProps.QUERYINPUT); optionQueryInput.setType(String.class); options.addOption(optionQueryInput); // dataInputFormat - Option optionDataInputFormat = new Option("d", DATAINPUTFORMAT, true, "required -- 'base', 'elasticsearch', or 'standalone' : Specify the input format"); + Option optionDataInputFormat = new Option("d", ResponderProps.DATAINPUTFORMAT, true, + "required -- 'base', 'elasticsearch', or 'standalone' : Specify the input format"); optionDataInputFormat.setRequired(false); - optionDataInputFormat.setArgName(DATAINPUTFORMAT); + optionDataInputFormat.setArgName(ResponderProps.DATAINPUTFORMAT); optionDataInputFormat.setType(String.class); options.addOption(optionDataInputFormat); // inputData - Option optionInputData = new Option("i", INPUTDATA, true, "required -- Fully qualified name of input file/directory in hdfs; used if inputFormat = 'base'"); + Option optionInputData = new Option("i", ResponderProps.INPUTDATA, true, + "required -- Fully qualified name of input file/directory in hdfs; used if inputFormat = 'base'"); optionInputData.setRequired(false); - optionInputData.setArgName(INPUTDATA); + optionInputData.setArgName(ResponderProps.INPUTDATA); optionInputData.setType(String.class); options.addOption(optionInputData); // baseInputFormat - Option optionBaseInputFormat = new Option("bif", BASEINPUTFORMAT, true, + Option optionBaseInputFormat = new Option("bif", ResponderProps.BASEINPUTFORMAT, true, "required if baseInputFormat = 'base' -- Full class name of the InputFormat to use when reading in the data - must extend BaseInputFormat"); optionBaseInputFormat.setRequired(false); - optionBaseInputFormat.setArgName(BASEINPUTFORMAT); + optionBaseInputFormat.setArgName(ResponderProps.BASEINPUTFORMAT); optionBaseInputFormat.setType(String.class); options.addOption(optionBaseInputFormat); // baseQuery - Option optionBaseQuery = new Option("j", BASEQUERY, true, + Option optionBaseQuery = new Option("j", ResponderProps.BASEQUERY, true, "optional -- ElasticSearch-like query if using 'base' input format - used to filter records in the RecordReader"); optionBaseQuery.setRequired(false); - optionBaseQuery.setArgName(BASEQUERY); + optionBaseQuery.setArgName(ResponderProps.BASEQUERY); optionBaseQuery.setType(String.class); options.addOption(optionBaseQuery); // esResource - Option optionEsResource = new Option("er", ESRESOURCE, true, + Option optionEsResource = new Option("er", ResponderProps.ESRESOURCE, true, "required if baseInputFormat = 'elasticsearch' -- Requires the format <index>/<type> : Elasticsearch resource where data is read and written to"); optionEsResource.setRequired(false); - optionEsResource.setArgName(ESRESOURCE); + optionEsResource.setArgName(ResponderProps.ESRESOURCE); optionEsResource.setType(String.class); options.addOption(optionEsResource); // esQuery - Option optionEsQuery = new Option("eq", ESQUERY, true, + Option optionEsQuery = new Option("eq", ResponderProps.ESQUERY, true, "required if baseInputFormat = 'elasticsearch' -- ElasticSearch query if using 'elasticsearch' input format"); optionEsQuery.setRequired(false); - optionEsQuery.setArgName(ESQUERY); + optionEsQuery.setArgName(ResponderProps.ESQUERY); optionEsQuery.setType(String.class); options.addOption(optionEsQuery); // outputFile - Option optionOutputFile = new Option("o", OUTPUTFILE, true, "required -- Fully qualified name of output file in hdfs"); + Option optionOutputFile = new Option("o", ResponderProps.OUTPUTFILE, true, "required -- Fully qualified name of output file in hdfs"); optionOutputFile.setRequired(false); - optionOutputFile.setArgName(OUTPUTFILE); + optionOutputFile.setArgName(ResponderProps.OUTPUTFILE); optionOutputFile.setType(String.class); options.addOption(optionOutputFile); // stopListFile - Option optionStopListFile = new Option("sf", STOPLISTFILE, true, + Option optionStopListFile = new Option("sf", ResponderProps.STOPLISTFILE, true, "optional (unless using StopListFilter) -- Fully qualified file in hdfs containing stoplist terms; used by the StopListFilter"); optionStopListFile.setRequired(false); - optionStopListFile.setArgName(STOPLISTFILE); + optionStopListFile.setArgName(ResponderProps.STOPLISTFILE); optionStopListFile.setType(String.class); options.addOption(optionStopListFile); // numReduceTasks - Option optionNumReduceTasks = new Option("nr", NUMREDUCETASKS, true, "optional -- Number of reduce tasks"); + Option optionNumReduceTasks = new Option("nr", ResponderProps.NUMREDUCETASKS, true, "optional -- Number of reduce tasks"); optionNumReduceTasks.setRequired(false); - optionNumReduceTasks.setArgName(NUMREDUCETASKS); + optionNumReduceTasks.setArgName(ResponderProps.NUMREDUCETASKS); optionNumReduceTasks.setType(String.class); options.addOption(optionNumReduceTasks); // useLocalCache - Option optionUseLocalCache = new Option("ulc", USELOCALCACHE, true, + Option optionUseLocalCache = new Option("ulc", ResponderProps.USELOCALCACHE, true, "optional -- 'true' or 'false : Whether or not to use the local cache for modular exponentiation; Default is 'true'"); optionUseLocalCache.setRequired(false); - optionUseLocalCache.setArgName(USELOCALCACHE); + optionUseLocalCache.setArgName(ResponderProps.USELOCALCACHE); optionUseLocalCache.setType(String.class); options.addOption(optionUseLocalCache); // limitHitsPerSelector - Option optionLimitHitsPerSelector = new Option("lh", LIMITHITSPERSELECTOR, true, + Option optionLimitHitsPerSelector = new Option("lh", ResponderProps.LIMITHITSPERSELECTOR, true, "optional -- 'true' or 'false : Whether or not to limit the number of hits per selector; Default is 'true'"); optionLimitHitsPerSelector.setRequired(false); - optionLimitHitsPerSelector.setArgName(LIMITHITSPERSELECTOR); + optionLimitHitsPerSelector.setArgName(ResponderProps.LIMITHITSPERSELECTOR); optionLimitHitsPerSelector.setType(String.class); options.addOption(optionLimitHitsPerSelector); // maxHitsPerSelector - Option optionMaxHitsPerSelector = new Option("mh", MAXHITSPERSELECTOR, true, "optional -- Max number of hits encrypted per selector"); + Option optionMaxHitsPerSelector = new Option("mh", ResponderProps.MAXHITSPERSELECTOR, true, "optional -- Max number of hits encrypted per selector"); optionMaxHitsPerSelector.setRequired(false); - optionMaxHitsPerSelector.setArgName(MAXHITSPERSELECTOR); + optionMaxHitsPerSelector.setArgName(ResponderProps.MAXHITSPERSELECTOR); optionMaxHitsPerSelector.setType(String.class); options.addOption(optionMaxHitsPerSelector); // mapreduce.map.memory.mb - Option optionMapMemory = new Option("mm", MAPMEMORY, true, "optional -- Amount of memory (in MB) to allocate per map task; Default is 3000"); + Option optionMapMemory = new Option("mm", ResponderProps.MAPMEMORY, true, "optional -- Amount of memory (in MB) to allocate per map task; Default is 3000"); optionMapMemory.setRequired(false); - optionMapMemory.setArgName(MAPMEMORY); + optionMapMemory.setArgName(ResponderProps.MAPMEMORY); optionMapMemory.setType(String.class); options.addOption(optionMapMemory); // mapreduce.reduce.memory.mb - Option optionReduceMemory = new Option("rm", REDUCEMEMORY, true, "optional -- Amount of memory (in MB) to allocate per reduce task; Default is 3000"); + Option optionReduceMemory = new Option("rm", ResponderProps.REDUCEMEMORY, true, + "optional -- Amount of memory (in MB) to allocate per reduce task; Default is 3000"); optionReduceMemory.setRequired(false); - optionReduceMemory.setArgName(REDUCEMEMORY); + optionReduceMemory.setArgName(ResponderProps.REDUCEMEMORY); optionReduceMemory.setType(String.class); options.addOption(optionReduceMemory); // mapreduce.map.java.opts - Option optionMapOpts = new Option("mjo", MAPJAVAOPTS, true, "optional -- Amount of heap (in MB) to allocate per map task; Default is -Xmx2800m"); + Option optionMapOpts = new Option("mjo", ResponderProps.MAPJAVAOPTS, true, + "optional -- Amount of heap (in MB) to allocate per map task; Default is -Xmx2800m"); optionMapOpts.setRequired(false); - optionMapOpts.setArgName(MAPJAVAOPTS); + optionMapOpts.setArgName(ResponderProps.MAPJAVAOPTS); optionMapOpts.setType(String.class); options.addOption(optionMapOpts); // mapreduce.reduce.java.opts - Option optionReduceOpts = new Option("rjo", REDUCEJAVAOPTS, true, "optional -- Amount of heap (in MB) to allocate per reduce task; Default is -Xmx2800m"); + Option optionReduceOpts = new Option("rjo", ResponderProps.REDUCEJAVAOPTS, true, + "optional -- Amount of heap (in MB) to allocate per reduce task; Default is -Xmx2800m"); optionReduceOpts.setRequired(false); - optionReduceOpts.setArgName(REDUCEJAVAOPTS); + optionReduceOpts.setArgName(ResponderProps.REDUCEJAVAOPTS); optionReduceOpts.setType(String.class); options.addOption(optionReduceOpts); // data.schemas - Option optionDataSchemas = new Option("ds", DATASCHEMAS, true, "required -- Comma separated list of data schema file names"); + Option optionDataSchemas = new Option("ds", ResponderProps.DATASCHEMAS, true, "required -- Comma separated list of data schema file names"); optionDataSchemas.setRequired(false); - optionDataSchemas.setArgName(DATASCHEMAS); + optionDataSchemas.setArgName(ResponderProps.DATASCHEMAS); optionDataSchemas.setType(String.class); options.addOption(optionDataSchemas); // query.schemas - Option optionQuerySchemas = new Option("qs", QUERYSCHEMAS, true, "required -- Comma separated list of query schema file names"); + Option optionQuerySchemas = new Option("qs", ResponderProps.QUERYSCHEMAS, true, "required -- Comma separated list of query schema file names"); optionQuerySchemas.setRequired(false); - optionQuerySchemas.setArgName(QUERYSCHEMAS); + optionQuerySchemas.setArgName(ResponderProps.QUERYSCHEMAS); optionQuerySchemas.setType(String.class); options.addOption(optionQuerySchemas); // pir.numExpLookupPartitions - Option optionExpParts = new Option("expParts", NUMEXPLOOKUPPARTS, true, "optional -- Number of partitions for the exp lookup table"); + Option optionExpParts = new Option("expParts", ResponderProps.NUMEXPLOOKUPPARTS, true, "optional -- Number of partitions for the exp lookup table"); optionExpParts.setRequired(false); - optionExpParts.setArgName(NUMEXPLOOKUPPARTS); + optionExpParts.setArgName(ResponderProps.NUMEXPLOOKUPPARTS); optionExpParts.setType(String.class); options.addOption(optionExpParts); // pir.numExpLookupPartitions - Option optionHdfsExp = new Option("hdfsExp", USEHDFSLOOKUPTABLE, true, + Option optionHdfsExp = new Option("hdfsExp", ResponderProps.USEHDFSLOOKUPTABLE, true, "optional -- 'true' or 'false' - Whether or not to generate and use the hdfs lookup table" + " for modular exponentiation"); optionHdfsExp.setRequired(false); - optionHdfsExp.setArgName(USEHDFSLOOKUPTABLE); + optionHdfsExp.setArgName(ResponderProps.USEHDFSLOOKUPTABLE); optionHdfsExp.setType(String.class); options.addOption(optionHdfsExp); // numDataPartitions - Option optionDataParts = new Option("dataParts", NUMDATAPARTITIONS, true, "optional -- Number of partitions for the input data"); + Option optionDataParts = new Option("dataParts", ResponderProps.NUMDATAPARTITIONS, true, "optional -- Number of partitions for the input data"); optionDataParts.setRequired(false); - optionDataParts.setArgName(NUMDATAPARTITIONS); + optionDataParts.setArgName(ResponderProps.NUMDATAPARTITIONS); optionDataParts.setType(String.class); options.addOption(optionDataParts); // useModExpJoin - Option optionModExpJoin = new Option("useModExpJoin", USEMODEXPJOIN, true, "optional -- 'true' or 'false' -- Spark only -- Whether or not to " - + "pre-compute the modular exponentiation table and join it to the data partitions when performing the encrypted row calculations"); + Option optionModExpJoin = new Option("useModExpJoin", ResponderProps.USEMODEXPJOIN, true, + "optional -- 'true' or 'false' -- Spark only -- Whether or not to " + + "pre-compute the modular exponentiation table and join it to the data partitions when performing the encrypted row calculations"); optionModExpJoin.setRequired(false); - optionModExpJoin.setArgName(USEMODEXPJOIN); + optionModExpJoin.setArgName(ResponderProps.USEMODEXPJOIN); optionModExpJoin.setType(String.class); options.addOption(optionModExpJoin); // numColMultPartitions - Option optionNumColMultPartitions = new Option("numColMultParts", NUMCOLMULTPARTITIONS, true, "optional, Spark only -- Number of partitions to " - + "use when performing column multiplication"); + Option optionNumColMultPartitions = new Option("numColMultParts", ResponderProps.NUMCOLMULTPARTITIONS, true, + "optional, Spark only -- Number of partitions to " + "use when performing column multiplication"); optionNumColMultPartitions.setRequired(false); - optionNumColMultPartitions.setArgName(NUMCOLMULTPARTITIONS); + optionNumColMultPartitions.setArgName(ResponderProps.NUMCOLMULTPARTITIONS); optionNumColMultPartitions.setType(String.class); options.addOption(optionNumColMultPartitions); // colMultReduceByKey - Option optionColMultReduceByKey = new Option("colMultRBK", COLMULTREDUCEBYKEY, true, "optional -- 'true' or 'false' -- Spark only -- " + Option optionColMultReduceByKey = new Option("colMultRBK", ResponderProps.COLMULTREDUCEBYKEY, true, "optional -- 'true' or 'false' -- Spark only -- " + "If true, uses reduceByKey in performing column multiplication; if false, uses groupByKey -> reduce"); optionColMultReduceByKey.setRequired(false); - optionColMultReduceByKey.setArgName(COLMULTREDUCEBYKEY); + optionColMultReduceByKey.setArgName(ResponderProps.COLMULTREDUCEBYKEY); optionColMultReduceByKey.setType(String.class); options.addOption(optionColMultReduceByKey); - // colMultReduceByKey - Option optionAllowEmbeddedQS = new Option("allowEmbeddedQS", ALLOWEMBEDDEDQUERYSCHEMAS, true, "optional -- 'true' or 'false' (defaults to 'false') -- " - + "If true, allows embedded QuerySchemas for a query."); + // allowEmbeddedQS + Option optionAllowEmbeddedQS = new Option("allowEmbeddedQS", ResponderProps.ALLOWEMBEDDEDQUERYSCHEMAS, true, + "optional -- 'true' or 'false' (defaults to 'false') -- " + "If true, allows embedded QuerySchemas for a query."); optionAllowEmbeddedQS.setRequired(false); - optionAllowEmbeddedQS.setArgName(ALLOWEMBEDDEDQUERYSCHEMAS); + optionAllowEmbeddedQS.setArgName(ResponderProps.ALLOWEMBEDDEDQUERYSCHEMAS); optionAllowEmbeddedQS.setType(String.class); options.addOption(optionAllowEmbeddedQS); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index 61dbb23..da24ae4 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -27,6 +27,8 @@ import org.apache.pirk.responder.wideskies.spark.ComputeResponse; import org.apache.pirk.responder.wideskies.standalone.Responder; import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Driver class for the responder @@ -42,28 +44,30 @@ import org.apache.pirk.utils.SystemConfiguration; */ public class ResponderDriver { + private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); + public static void main(String[] args) throws Exception { ResponderCLI responderCLI = new ResponderCLI(args); - if (responderCLI.getOptionValue(ResponderCLI.PLATFORM).equals("mapreduce")) + if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce")) { - System.out.println("Launching MapReduce ResponderTool:"); + logger.info("Launching MapReduce ResponderTool:"); ComputeResponseTool pirWLTool = new ComputeResponseTool(); ToolRunner.run(pirWLTool, new String[] {}); } - else if (responderCLI.getOptionValue(ResponderCLI.PLATFORM).equals("spark")) + else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark")) { - System.out.println("Launching Spark ComputeResponse:"); + logger.info("Launching Spark ComputeResponse:"); FileSystem fs = FileSystem.get(new Configuration()); ComputeResponse computeResponse = new ComputeResponse(fs); computeResponse.performQuery(); } - else if (responderCLI.getOptionValue(ResponderCLI.PLATFORM).equals("standalone")) + else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone")) { - System.out.println("Launching Standalone Responder:"); + logger.info("Launching Standalone Responder:"); String queryInput = SystemConfiguration.getProperty("pir.queryInput"); Query query = new LocalFileSystemStore().recall(queryInput, Query.class); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java new file mode 100644 index 0000000..1f2130b --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies; + +import java.util.Arrays; +import java.util.List; + +import org.apache.pirk.inputformat.hadoop.InputFormatConst; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Properties constants and validation for the Responder + */ +public class ResponderProps +{ + private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); + + // Required properties + public static final String PLATFORM = "platform"; + public static final String QUERYINPUT = "pir.queryInput"; + public static final String DATAINPUTFORMAT = "pir.dataInputFormat"; + public static final String OUTPUTFILE = "pir.outputFile"; + + // Optional properties + public static final String INPUTDATA = "pir.inputData"; + public static final String BASEQUERY = "pir.baseQuery"; + public static final String ESRESOURCE = "pir.esResource"; + public static final String ESQUERY = "pir.esQuery"; + public static final String BASEINPUTFORMAT = "pir.baseInputFormat"; + public static final String STOPLISTFILE = "pir.stopListFile"; + public static final String NUMREDUCETASKS = "pir.numReduceTasks"; + public static final String USELOCALCACHE = "pir.useLocalCache"; + public static final String LIMITHITSPERSELECTOR = "pir.limitHitsPerSelector"; + public static final String MAXHITSPERSELECTOR = "pir.maxHitsPerSelector"; + public static final String MAPMEMORY = "mapreduce.map.memory.mb"; + public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; + public static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; + public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; + public static final String QUERYSCHEMAS = "responder.querySchemas"; + public static final String DATASCHEMAS = "responder.dataSchemas"; + public static final String NUMEXPLOOKUPPARTS = "pir.numExpLookupPartitions"; + public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; + public static final String NUMDATAPARTITIONS = "pir.numDataPartitions"; + public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions"; + public static final String USEMODEXPJOIN = "pir.useModExpJoin"; + public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey"; + public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; + + public static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE, + BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, + REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN, + COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS); + + /** + * Validates the responder properties + * + */ + public static boolean validateResponderProperties() + { + boolean valid = true; + + // Parse general required options + + if (!SystemConfiguration.hasProperty(PLATFORM)) + { + logger.info("Must have the option " + PLATFORM); + valid = false; + } + + String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); + if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone")) + { + logger.info("Unsupported platform: " + platform); + valid = false; + } + + if (!SystemConfiguration.hasProperty(QUERYINPUT)) + { + logger.info("Must have the option " + QUERYINPUT); + valid = false; + } + + if (!SystemConfiguration.hasProperty(OUTPUTFILE)) + { + logger.info("Must have the option " + OUTPUTFILE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(DATAINPUTFORMAT)) + { + logger.info("Must have the option " + DATAINPUTFORMAT); + valid = false; + } + String dataInputFormat = SystemConfiguration.getProperty(DATAINPUTFORMAT).toLowerCase(); + + // Parse required properties by dataInputFormat + + if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) + { + if (!SystemConfiguration.hasProperty(BASEINPUTFORMAT)) + { + logger.info("For base inputformt: Must have the option " + BASEINPUTFORMAT + " if using " + InputFormatConst.BASE_FORMAT); + valid = false; + } + + if (!SystemConfiguration.hasProperty(INPUTDATA)) + { + logger.info("For base inputformt: Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); + valid = false; + } + + if (!SystemConfiguration.hasProperty(BASEQUERY)) + { + SystemConfiguration.setProperty("BASEQUERY", "?q=*"); + } + } + else if (dataInputFormat.equals(InputFormatConst.ES)) + { + if (!SystemConfiguration.hasProperty(ESRESOURCE)) + { + logger.info("For ElasticSearch inputformt: Must have the option " + ESRESOURCE); + valid = false; + } + + if (!SystemConfiguration.hasProperty(ESQUERY)) + { + logger.info("For ElasticSearch inputformat: Must have the option " + ESQUERY); + valid = false; + } + } + else if (dataInputFormat.equalsIgnoreCase("standalone")) + { + if (!SystemConfiguration.hasProperty(INPUTDATA)) + { + logger.info("Must have the option " + INPUTDATA + " if using " + InputFormatConst.BASE_FORMAT); + valid = false; + } + } + else + { + logger.info("Unsupported inputFormat = " + dataInputFormat); + valid = false; + } + + // Parse optional properties with defaults + + if (SystemConfiguration.hasProperty(QUERYSCHEMAS)) + { + SystemConfiguration.appendProperty("query.schemas", SystemConfiguration.getProperty(QUERYSCHEMAS)); + } + + if (SystemConfiguration.hasProperty(DATASCHEMAS)) + { + SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS)); + } + + if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE)) + { + SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false"); + } + + if (!SystemConfiguration.hasProperty(USEMODEXPJOIN)) + { + SystemConfiguration.setProperty(USEMODEXPJOIN, "false"); + } + + if (!SystemConfiguration.hasProperty(NUMDATAPARTITIONS)) + { + SystemConfiguration.setProperty(NUMDATAPARTITIONS, "1000"); + } + + if (!SystemConfiguration.hasProperty(NUMCOLMULTPARTITIONS)) + { + SystemConfiguration.setProperty(NUMCOLMULTPARTITIONS, "1000"); + } + + if (!SystemConfiguration.hasProperty(COLMULTREDUCEBYKEY)) + { + SystemConfiguration.setProperty(COLMULTREDUCEBYKEY, "false"); + } + + if (!SystemConfiguration.hasProperty(ALLOWEMBEDDEDQUERYSCHEMAS)) + { + SystemConfiguration.setProperty(ALLOWEMBEDDEDQUERYSCHEMAS, "false"); + } + + if (!SystemConfiguration.hasProperty(USELOCALCACHE)) + { + SystemConfiguration.setProperty(USELOCALCACHE, "true"); + } + + // Load the new local query and data schemas + if (valid) + { + logger.info("loading schemas: dataSchemas = " + SystemConfiguration.getProperty("data.schemas") + " querySchemas = " + + SystemConfiguration.getProperty("query.schemas")); + try + { + DataSchemaLoader.initialize(); + QuerySchemaLoader.initialize(); + + } catch (Exception e) + { + e.printStackTrace(); + } + } + + return valid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index cfe4a68..a14664c 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -183,6 +183,7 @@ public class ComputeResponse { qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType()); } + DataSchema dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName()); bVars.setQuerySchema(qSchema); bVars.setDataSchema(dSchema); @@ -202,7 +203,7 @@ public class ComputeResponse numColMultPartitions = Integer.parseInt(SystemConfiguration.getProperty("pir.numColMultPartitions", numDataPartsString)); // Whether or not we are performing a reduceByKey or a groupByKey->reduce for column multiplication - colMultReduceByKey = SystemConfiguration.getProperty("pir.colMultReduceByKey").equals("true"); + colMultReduceByKey = SystemConfiguration.getProperty("pir.colMultReduceByKey", "false").equals("true"); // Set the expDir bVars.setExpDir(outputDirExp); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index fb67445..44f99b7 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -45,7 +45,7 @@ import org.xml.sax.SAXException; /** * Class to load any data schemas specified in the properties file, 'data.schemas' * <p> - * Schemas should be specified as follows; all items are treated in a case insensitive manner: + * Schemas should be specified as follows: * * <pre> * {@code http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java index d767f2d..00d2d0c 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java @@ -49,7 +49,7 @@ import org.xml.sax.SAXException; /** * Class to load any query schemas specified in the properties file, 'query.schemas' * <p> - * Schemas should be specified as follows; all items are treated in a case insensitive manner: + * Schemas should be specified as follows: * * <pre> * {@code http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java index 15d7622..2970d43 100644 --- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -32,7 +32,7 @@ import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; -import org.apache.pirk.responder.wideskies.ResponderCLI; +import org.apache.pirk.responder.wideskies.ResponderProps; import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.response.QueryResponseJSON; @@ -226,15 +226,15 @@ public class DistTestSuite // Test embedded QuerySchema SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); - BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); - BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); - BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); // Test pad columns @@ -402,30 +402,30 @@ public class DistTestSuite String inputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); logger.info("inputFormat = " + inputFormat); ArrayList<String> args = new ArrayList<>(); - args.add("-" + ResponderCLI.PLATFORM + "=spark"); - args.add("-" + ResponderCLI.DATAINPUTFORMAT + "=" + inputFormat); - args.add("-" + ResponderCLI.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput")); - args.add("-" + ResponderCLI.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile")); - args.add("-" + ResponderCLI.STOPLISTFILE + "=" + SystemConfiguration.getProperty("pir.stopListFile")); - args.add("-" + ResponderCLI.USELOCALCACHE + "=" + SystemConfiguration.getProperty("pir.useLocalCache", "true")); - args.add("-" + ResponderCLI.LIMITHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.limitHitsPerSelector", "false")); - args.add("-" + ResponderCLI.MAXHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.maxHitsPerSelector", "1000")); - args.add("-" + ResponderCLI.QUERYSCHEMAS + "=" + Inputs.HDFS_QUERY_FILES); - args.add("-" + ResponderCLI.DATASCHEMAS + "=" + Inputs.DATA_SCHEMA_FILE_HDFS); - args.add("-" + ResponderCLI.NUMEXPLOOKUPPARTS + "=" + SystemConfiguration.getProperty("pir.numExpLookupPartitions", "100")); - args.add("-" + ResponderCLI.USEMODEXPJOIN + "=" + SystemConfiguration.getProperty("pir.useModExpJoin", "false")); - args.add("-" + ResponderCLI.NUMCOLMULTPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numColMultPartitions", "20")); - args.add("-" + ResponderCLI.COLMULTREDUCEBYKEY + "=" + SystemConfiguration.getProperty("pir.colMultReduceByKey", "false")); + args.add("-" + ResponderProps.PLATFORM + "=spark"); + args.add("-" + ResponderProps.DATAINPUTFORMAT + "=" + inputFormat); + args.add("-" + ResponderProps.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput")); + args.add("-" + ResponderProps.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile")); + args.add("-" + ResponderProps.STOPLISTFILE + "=" + SystemConfiguration.getProperty("pir.stopListFile")); + args.add("-" + ResponderProps.USELOCALCACHE + "=" + SystemConfiguration.getProperty("pir.useLocalCache", "true")); + args.add("-" + ResponderProps.LIMITHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.limitHitsPerSelector", "false")); + args.add("-" + ResponderProps.MAXHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.maxHitsPerSelector", "1000")); + args.add("-" + ResponderProps.QUERYSCHEMAS + "=" + Inputs.HDFS_QUERY_FILES); + args.add("-" + ResponderProps.DATASCHEMAS + "=" + Inputs.DATA_SCHEMA_FILE_HDFS); + args.add("-" + ResponderProps.NUMEXPLOOKUPPARTS + "=" + SystemConfiguration.getProperty("pir.numExpLookupPartitions", "100")); + args.add("-" + ResponderProps.USEMODEXPJOIN + "=" + SystemConfiguration.getProperty("pir.useModExpJoin", "false")); + args.add("-" + ResponderProps.NUMCOLMULTPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numColMultPartitions", "20")); + args.add("-" + ResponderProps.COLMULTREDUCEBYKEY + "=" + SystemConfiguration.getProperty("pir.colMultReduceByKey", "false")); if (inputFormat.equals(InputFormatConst.BASE_FORMAT)) { - args.add("-" + ResponderCLI.INPUTDATA + "=" + SystemConfiguration.getProperty("pir.inputData")); - args.add("-" + ResponderCLI.BASEQUERY + "=" + SystemConfiguration.getProperty("pir.baseQuery")); - args.add("-" + ResponderCLI.BASEINPUTFORMAT + "=" + SystemConfiguration.getProperty("pir.baseInputFormat")); + args.add("-" + ResponderProps.INPUTDATA + "=" + SystemConfiguration.getProperty("pir.inputData")); + args.add("-" + ResponderProps.BASEQUERY + "=" + SystemConfiguration.getProperty("pir.baseQuery")); + args.add("-" + ResponderProps.BASEINPUTFORMAT + "=" + SystemConfiguration.getProperty("pir.baseInputFormat")); } else if (inputFormat.equals(InputFormatConst.ES)) { - args.add("-" + ResponderCLI.ESQUERY + "=" + SystemConfiguration.getProperty("pir.esQuery")); - args.add("-" + ResponderCLI.ESRESOURCE + "=" + SystemConfiguration.getProperty("pir.esResource")); + args.add("-" + ResponderProps.ESQUERY + "=" + SystemConfiguration.getProperty("pir.esQuery")); + args.add("-" + ResponderProps.ESRESOURCE + "=" + SystemConfiguration.getProperty("pir.esResource")); } for (String arg : args) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/1b38ea67/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index be4d848..bdca952 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -408,7 +408,7 @@ public class Inputs logger.info("pirStopListFile file successfully created!"); } - tmpFileName = TestUtils.writeToTmpFile(elements, SystemConfiguration.getProperty("pir.stopListFile"), null); + tmpFileName = TestUtils.writeToTmpFile(elements, SystemConfiguration.getProperty(DistributedTestDriver.PIR_STOPLIST_FILE), null); return tmpFileName; }
