Repository: incubator-pirk Updated Branches: refs/heads/master 374d86c34 -> 99f9c624f
[PIRK-17] - Add Ability to Embed QuerySchema in Query, plus minor misc cleanup - closes apache/incubator-pirk#23 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/99f9c624 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/99f9c624 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/99f9c624 Branch: refs/heads/master Commit: 99f9c624ffa4abbeb6c3fb0083065465794fe790 Parents: 374d86c Author: eawilliams <[email protected]> Authored: Sat Jul 23 22:18:46 2016 -0400 Committer: eawilliams <[email protected]> Committed: Sat Jul 23 22:18:46 2016 -0400 ---------------------------------------------------------------------- pom.xml | 2 +- .../pirk/querier/wideskies/QuerierDriver.java | 6 +++ .../querier/wideskies/QuerierDriverCLI.java | 19 ++++++++++ .../decrypt/DecryptResponseRunnable.java | 12 +++++- .../querier/wideskies/encrypt/EncryptQuery.java | 5 +-- .../apache/pirk/query/wideskies/QueryInfo.java | 17 ++++++++- .../apache/pirk/query/wideskies/QueryUtils.java | 16 ++------ .../pirk/responder/wideskies/ResponderCLI.java | 30 ++++++++++++--- .../wideskies/common/ComputeEncryptedRow.java | 16 +++----- .../common/HashSelectorAndPartitionData.java | 6 +-- .../wideskies/mapreduce/ColumnMultReducer.java | 1 - .../mapreduce/ComputeResponseTool.java | 10 ++++- .../HashSelectorsAndPartitionDataMapper.java | 11 +++++- .../wideskies/mapreduce/RowCalcReducer.java | 5 --- .../wideskies/spark/BroadcastVars.java | 26 +++++++++++++ .../wideskies/spark/ComputeResponse.java | 14 +++++++ .../responder/wideskies/spark/EncRowCalc.java | 19 +++++----- .../spark/EncRowCalcPrecomputedCache.java | 25 ++++--------- .../spark/HashSelectorsAndPartitionData.java | 16 +++++--- .../wideskies/standalone/Responder.java | 16 +++++++- .../pirk/schema/data/LoadDataSchemas.java | 2 +- .../apache/pirk/schema/query/QuerySchema.java | 2 +- .../distributed/testsuite/DistTestSuite.java | 39 ++++++++++++++++++++ .../org/apache/pirk/test/utils/BaseTests.java | 25 +++++++++---- .../java/org/apache/pirk/test/utils/Inputs.java | 2 +- .../apache/pirk/test/utils/StandaloneQuery.java | 10 ++++- .../org/apache/pirk/utils/QueryParserUtils.java | 4 +- .../java/org/apache/pirk/utils/StringUtils.java | 4 +- .../apache/pirk/utils/SystemConfiguration.java | 2 +- src/main/resources/log4j2.properties | 4 +- src/main/resources/pirk.properties | 8 ++-- .../java/test/general/PartitionUtilsTest.java | 1 + .../java/test/general/QueryParserUtilsTest.java | 1 + .../test/schema/query/LoadQuerySchemaTest.java | 2 + .../wideskies/standalone/StandaloneTest.java | 17 +++++++++ 35 files changed, 291 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ec9471e..5b31b8c 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ <artifactId>spark-core_2.11</artifactId> <version>1.6.1</version> </dependency> - + <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java index 8f287fd..efa0533 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java @@ -164,6 +164,12 @@ public class QuerierDriver implements Serializable // Set the necessary QueryInfo and Paillier objects QueryInfo queryInfo = new QueryInfo(queryNum, numSelectors, hashBitSize, hashKey, dataPartitionBitSize, queryType, queryName, paillierBitSize, useMemLookupTable, embedSelector, useHDFSLookupTable); + + if (SystemConfiguration.getProperty("pir.embedQuerySchema").equals("true")) + { + queryInfo.addQuerySchema(LoadQuerySchemas.getSchema(queryType)); + } + Paillier paillier = new Paillier(paillierBitSize, certainty, bitSet); // throws PIRException if certainty conditions are not satisfied // Check the number of selectors to ensure that 2^{numSelector*dataPartitionBitSize} < N http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java index 9012b51..193617a 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriverCLI.java @@ -46,6 +46,7 @@ public class QuerierDriverCLI public static String OUTPUTFILE = "outputFile"; public static String TYPE = "queryType"; public static String NUMTHREADS = "numThreads"; + public static String EMBEDQUERYSCHEMA = "embedQuerySchema"; // Encryption variables public static String HASHBITSIZE = "hashBitSize"; @@ -183,6 +184,16 @@ public class QuerierDriverCLI } SystemConfiguration.setProperty("data.schemas", getOptionValue(DATASCHEMAS)); + // Parse general optional args + if (hasOption(EMBEDQUERYSCHEMA)) + { + SystemConfiguration.setProperty("pir.embedQuerySchema", getOptionValue(EMBEDQUERYSCHEMA)); + } + else + { + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + } + // Parse encryption args if (action.equals("encrypt")) { @@ -469,6 +480,14 @@ public class QuerierDriverCLI optionQUERIERFILE.setType(String.class); options.addOption(optionQUERIERFILE); + // embedQuerySchema + Option optionEMBEDQUERYSCHEMA = new Option("embedQS", EMBEDQUERYSCHEMA, true, + "optional (defaults to false) -- Whether or not to embed the QuerySchema in the Query (via QueryInfo)"); + optionEMBEDQUERYSCHEMA.setRequired(false); + optionEMBEDQUERYSCHEMA.setArgName(EMBEDQUERYSCHEMA); + optionEMBEDQUERYSCHEMA.setType(String.class); + options.addOption(optionEMBEDQUERYSCHEMA); + // SR_ALGORITHM Option optionSR_ALGORITHM = new Option("srAlg", SR_ALGORITHM, true, "optional - specify the SecureRandom algorithm, defaults to NativePRNG"); optionSR_ALGORITHM.setRequired(false); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseRunnable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseRunnable.java b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseRunnable.java index 35482c3..ea9d6bb 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseRunnable.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponseRunnable.java @@ -28,6 +28,7 @@ import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,8 @@ public class DecryptResponseRunnable implements Runnable private TreeMap<Integer,String> selectors = null; private HashMap<String,BigInteger> selectorMaskMap = null; private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; + private HashMap<Integer,String> embedSelectorMap = null; public DecryptResponseRunnable(ArrayList<BigInteger> rElementsInput, TreeMap<Integer,String> selectorsInput, HashMap<String,BigInteger> selectorMaskMapInput, @@ -58,6 +61,13 @@ public class DecryptResponseRunnable implements Runnable queryInfo = queryInfoInput; embedSelectorMap = embedSelectorMapInput; + if (SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")) + { + if ((qSchema = queryInfo.getQuerySchema()) == null) + { + qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + } + } resultMap = new HashMap<>(); } @@ -132,7 +142,7 @@ public class DecryptResponseRunnable implements Runnable QueryResponseJSON qrJOSN = null; try { - qrJOSN = QueryUtils.extractQueryResponseJSON(queryInfo, parts); + qrJOSN = QueryUtils.extractQueryResponseJSON(queryInfo, qSchema, parts); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index a277c46..b3c9be9 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -18,8 +18,6 @@ */ package org.apache.pirk.querier.wideskies.encrypt; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -29,7 +27,6 @@ import java.util.concurrent.TimeUnit; import org.apache.pirk.encryption.Paillier; import org.apache.pirk.querier.wideskies.Querier; -import org.apache.pirk.querier.wideskies.QuerierConst; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; @@ -203,7 +200,7 @@ public class EncryptQuery // Encrypt and form the query vector ExecutorService es = Executors.newCachedThreadPool(); ArrayList<EncryptQueryRunnable> runnables = new ArrayList<>(numThreads); - int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize + int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize // Split the work across the requested number of threads int elementsPerThread = numElements / numThreads; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java index 3ec73ea..0822ca9 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java +++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java @@ -21,6 +21,7 @@ package org.apache.pirk.query.wideskies; import java.io.Serializable; import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,8 @@ public class QueryInfo implements Serializable // false positive rate for variable length selectors and a zero false positive rate // for selectors of fixed size < 32 bits + QuerySchema qSchema = null; + public QueryInfo(double queryNumInput, int numSelectorsInput, int hashBitSizeInput, String hashKeyInput, int dataPartitionBitSizeInput, String queryTypeInput, String queryNameInput, int paillierBitSizeIn, boolean useExpLookupTableInput, boolean embedSelectorInput, boolean useHDFSExpLookupTableInput) @@ -166,6 +169,16 @@ public class QueryInfo implements Serializable return embedSelector; } + public void addQuerySchema(QuerySchema qSchemaIn) + { + qSchema = qSchemaIn; + } + + public QuerySchema getQuerySchema() + { + return qSchema; + } + public void printQueryInfo() { logger.info("queryNum = " + queryNum + " numSelectors = " + numSelectors + " hashBitSize = " + hashBitSize + " hashKey = " + hashKey @@ -176,7 +189,7 @@ public class QueryInfo implements Serializable public QueryInfo copy() { - return new QueryInfo(this.queryNum, this.numSelectors, this.hashBitSize, this.hashKey, this.dataPartitionBitSize, this.queryType, - this.queryName, this.paillierBitSize, this.useExpLookupTable, this.embedSelector, this.useHDFSExpLookupTable); + return new QueryInfo(this.queryNum, this.numSelectors, this.hashBitSize, this.hashKey, this.dataPartitionBitSize, this.queryType, this.queryName, + this.paillierBitSize, this.useExpLookupTable, this.embedSelector, this.useHDFSExpLookupTable); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java b/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java index 0bf2fb7..2c67866 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java +++ b/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java @@ -32,7 +32,6 @@ import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; -import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.utils.KeyedHash; @@ -53,12 +52,10 @@ public class QueryUtils /** * Method to convert the given BigInteger raw data element partitions to a QueryResponseJSON object based upon the given queryType */ - public static QueryResponseJSON extractQueryResponseJSON(QueryInfo queryInfo, ArrayList<BigInteger> parts) throws Exception + public static QueryResponseJSON extractQueryResponseJSON(QueryInfo queryInfo, QuerySchema qSchema, ArrayList<BigInteger> parts) throws Exception { QueryResponseJSON qrJSON = new QueryResponseJSON(queryInfo); - String queryType = queryInfo.getQueryType(); - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryType); DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); int numArrayElementsToReturn = Integer.parseInt(SystemConfiguration.getProperty("pir.numReturnArrayElements", "1")); @@ -107,13 +104,9 @@ public class QueryUtils /** * Method to convert the given data element given by the JSONObject data element into the extracted BigInteger partitions based upon the given queryType */ - public static ArrayList<BigInteger> partitionDataElement(String queryType, JSONObject jsonData, boolean embedSelector) throws Exception + public static ArrayList<BigInteger> partitionDataElement(QuerySchema qSchema, JSONObject jsonData, boolean embedSelector) throws Exception { ArrayList<BigInteger> parts = new ArrayList<>(); - - logger.debug("queryType = " + queryType); - - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryType); DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); // Add the embedded selector to the parts @@ -121,7 +114,7 @@ public class QueryUtils { String selectorFieldName = qSchema.getSelectorName(); String type = dSchema.getElementType(selectorFieldName); - String selector = getSelectorByQueryTypeJSON(queryType, jsonData); + String selector = getSelectorByQueryTypeJSON(qSchema, jsonData); parts.addAll(embeddedSelectorToPartitions(selector, type, (dSchema.getPartitionerForElement(selectorFieldName)))); @@ -338,11 +331,10 @@ public class QueryUtils * <p> * Pulls first element of array if element is an array type */ - public static String getSelectorByQueryTypeJSON(String queryType, JSONObject dataMap) + public static String getSelectorByQueryTypeJSON(QuerySchema qSchema, JSONObject dataMap) { String selector; - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryType); DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); String fieldName = qSchema.getSelectorName(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index e412625..e60c262 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -70,6 +70,7 @@ public class ResponderCLI public static String NUMCOLMULTPARTITIONS = "numColMultPartitions"; public static String USEMODEXPJOIN = "useModExpJoin"; public static String COLMULTREDUCEBYKEY = "colMultReduceByKey"; + public static String ALLOWEMBEDDEDQUERYSCHEMAS = "allowAdHocQuerySchemas"; /** * Create and parse allowable options @@ -343,6 +344,15 @@ public class ResponderCLI SystemConfiguration.setProperty("pir.colMultReduceByKey", "false"); } + if (hasOption(ALLOWEMBEDDEDQUERYSCHEMAS)) + { + SystemConfiguration.setProperty("pir.allowEmbeddedQuerySchemas", getOptionValue(ALLOWEMBEDDEDQUERYSCHEMAS)); + } + else + { + SystemConfiguration.setProperty("pir.allowEmbeddedQuerySchemas", "false"); + } + // Load the new local query and data schemas try { @@ -552,19 +562,27 @@ public class ResponderCLI // numColMultPartitions Option optionNumColMultPartitions = new Option("numColMultParts", NUMCOLMULTPARTITIONS, true, "optional, Spark only -- Number of partitions to " + "use when performing column multiplication"); - optionModExpJoin.setRequired(false); - optionModExpJoin.setArgName(NUMCOLMULTPARTITIONS); - optionModExpJoin.setType(String.class); + optionNumColMultPartitions.setRequired(false); + optionNumColMultPartitions.setArgName(NUMCOLMULTPARTITIONS); + optionNumColMultPartitions.setType(String.class); options.addOption(optionNumColMultPartitions); // colMultReduceByKey Option optionColMultReduceByKey = new Option("colMultRBK", COLMULTREDUCEBYKEY, true, "optional -- 'true' or 'false' -- Spark only -- " + "If true, uses reduceByKey in performing column multiplication; if false, uses groupByKey -> reduce"); - optionModExpJoin.setRequired(false); - optionModExpJoin.setArgName(COLMULTREDUCEBYKEY); - optionModExpJoin.setType(String.class); + optionColMultReduceByKey.setRequired(false); + optionColMultReduceByKey.setArgName(COLMULTREDUCEBYKEY); + optionColMultReduceByKey.setType(String.class); options.addOption(optionColMultReduceByKey); + // colMultReduceByKey + Option optionAllowEmbeddedQS = new Option("allowEmbeddedQS", ALLOWEMBEDDEDQUERYSCHEMAS, true, "optional -- 'true' or 'false' (defaults to 'false') -- " + + "If true, allows embedded QuerySchemas for a query."); + optionAllowEmbeddedQS.setRequired(false); + optionAllowEmbeddedQS.setArgName(ALLOWEMBEDDEDQUERYSCHEMAS); + optionAllowEmbeddedQS.setType(String.class); + options.addOption(optionAllowEmbeddedQS); + return options; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java index 0a0d7ef..edba66a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java @@ -49,9 +49,8 @@ public class ComputeEncryptedRow // Input: base, exponent, NSquared // <<base,exponent,NSquared>, base^exponent mod N^2> - private static LoadingCache<Tuple3<BigInteger,BigInteger,BigInteger>,BigInteger> expCache = - CacheBuilder.newBuilder().maximumSize(10000) - .build(new CacheLoader<Tuple3<BigInteger,BigInteger,BigInteger>,BigInteger>() + private static LoadingCache<Tuple3<BigInteger,BigInteger,BigInteger>,BigInteger> expCache = CacheBuilder.newBuilder().maximumSize(10000) + .build(new CacheLoader<Tuple3<BigInteger,BigInteger,BigInteger>,BigInteger>() { @Override public BigInteger load(Tuple3<BigInteger,BigInteger,BigInteger> info) throws Exception @@ -152,10 +151,7 @@ public class ComputeEncryptedRow } /** - * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link ArrayList<BigInteger> - * - * - * } + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link ArrayList<BigInteger> * * * * } * <p> * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. * <p> @@ -228,10 +224,8 @@ public class ComputeEncryptedRow } /** - * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link <BytesArrayWritable> - * - * - * } given an input modular exponentiation table for the row + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link <BytesArrayWritable> * * * * } given + * an input modular exponentiation table for the row * <p> * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. * <p> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java index 7eb264a..b429377 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java @@ -80,18 +80,18 @@ public class HashSelectorAndPartitionData return returnTuple; } - public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo) throws Exception + public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo, QuerySchema qSchema) throws Exception { Tuple2<Integer,ArrayList<BigInteger>> returnTuple; // Pull the selector based on the query type - String selector = QueryUtils.getSelectorByQueryTypeJSON(queryInfo.getQueryType(), json); + String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json); int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); logger.debug("selector = " + selector + " hash = " + hash); // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(queryInfo.getQueryType(), json, queryInfo.getEmbedSelector()); + ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, json, queryInfo.getEmbedSelector()); returnTuple = new Tuple2<>(hash, hitValPartitions); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java index df3b7d0..22c4cab 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.math.BigInteger; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 6eab9fe..0d04eab 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -130,7 +130,14 @@ public class ComputeResponseTool extends Configured implements Tool query = new HadoopFileSystemStore(fs).recall(queryInputDir, Query.class); queryInfo = query.getQueryInfo(); - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + if (SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + } logger.info("outputFile = " + outputFile + " outputDirInit = " + outputDirInit + " outputDirColumnMult = " + outputDirColumnMult + " queryInputDir = " + queryInputDir + " stopListFile = " + stopListFile + " numReduceTasks = " + numReduceTasks + " esQuery = " + esQuery + " esResource = " + esResource); @@ -388,6 +395,7 @@ public class ComputeResponseTool extends Configured implements Tool job.getConfiguration().set("baseQuery", baseQuery); job.getConfiguration().set("query", baseQuery); + job.getConfiguration().set("pir.allowAdHocQuerySchemas", SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false")); job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java index b04babd..a244a40 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.HashSet; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -93,7 +92,15 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable { e.printStackTrace(); } - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + + if (ctx.getConfiguration().get("pir.allowAdHocQuerySchemas", "false").equals("true")) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + } dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java index ea57d2d..ab1e7b9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java @@ -23,7 +23,6 @@ import java.math.BigInteger; import java.util.ArrayList; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -33,7 +32,6 @@ import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; -import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; @@ -97,9 +95,6 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW e.printStackTrace(); } - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); - if (ctx.getConfiguration().get("pirWL.useLocalCache").equals("true")) { useLocalCache = true; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java index 89ce35f..bab4ae9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java @@ -22,6 +22,8 @@ import java.io.Serializable; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.schema.data.DataSchema; +import org.apache.pirk.schema.query.QuerySchema; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -39,6 +41,10 @@ public class BroadcastVars implements Serializable private Broadcast<QueryInfo> queryInfo = null; + private Broadcast<DataSchema> dataSchema = null; + + private Broadcast<QuerySchema> querySchema = null; + private Broadcast<String> useLocalCache = null; private Broadcast<Boolean> limitHitsPerSelector = null; @@ -72,6 +78,26 @@ public class BroadcastVars implements Serializable queryInfo = jsc.broadcast(queryInfoIn); } + public void setQuerySchema(QuerySchema qSchemaIn) + { + querySchema = jsc.broadcast(qSchemaIn); + } + + public QuerySchema getQuerySchema() + { + return querySchema.getValue(); + } + + public void setDataSchema(DataSchema dSchemaIn) + { + dataSchema = jsc.broadcast(dSchemaIn); + } + + public DataSchema getDataSchema() + { + return dataSchema.getValue(); + } + public void setUseLocalCache(String useLocalCacheInput) { useLocalCache = jsc.broadcast(useLocalCacheInput); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index c6b0d28..169493b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -33,6 +33,7 @@ import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; @@ -171,6 +172,19 @@ public class ComputeResponse bVars.setQuery(query); bVars.setQueryInfo(queryInfo); + QuerySchema qSchema = null; + if (SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); + } + DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + bVars.setQuerySchema(qSchema); + bVars.setDataSchema(dSchema); + // Set the local cache flag bVars.setUseLocalCache(SystemConfiguration.getProperty("pir.useLocalCache", "true")); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java index 2b28c46..aeab128 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java @@ -51,6 +51,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A private static final Logger logger = LoggerFactory.getLogger(EncRowCalc.class); private Accumulators accum = null; + private BroadcastVars bVars = null; private Query query = null; private QueryInfo queryInfo = null; @@ -59,21 +60,19 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A private boolean limitHitsPerSelector = false; private int maxHitsPerSelector = 0; - public EncRowCalc(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) + public EncRowCalc(Accumulators accumIn, BroadcastVars bvIn) { - accum = pirWLAccum; + accum = accumIn; + bVars = bvIn; - query = pirWLBBVars.getQuery(); - queryInfo = pirWLBBVars.getQueryInfo(); - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); - - if (pirWLBBVars.getUseLocalCache().equals("true")) + query = bVars.getQuery(); + queryInfo = bVars.getQueryInfo(); + if (bVars.getUseLocalCache().equals("true")) { useLocalCache = true; } - limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector(); - maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector(); + limitHitsPerSelector = bVars.getLimitHitsPerSelector(); + maxHitsPerSelector = bVars.getMaxHitsPerSelector(); logger.info("Initialized EncRowCalc - limitHitsPerSelector = " + limitHitsPerSelector + " maxHitsPerSelector = " + maxHitsPerSelector); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java index c855aa8..360b8ef 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java @@ -23,12 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; -import org.apache.pirk.schema.data.DataSchema; -import org.apache.pirk.schema.data.LoadDataSchemas; -import org.apache.pirk.schema.query.LoadQuerySchemas; -import org.apache.pirk.schema.query.QuerySchema; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,28 +40,24 @@ public class EncRowCalcPrecomputedCache implements private static final Logger logger = LoggerFactory.getLogger(EncRowCalcPrecomputedCache.class); private Accumulators accum = null; + private BroadcastVars bVars = null; Query query = null; private boolean limitHitsPerSelector = false; private int maxHitsPerSelector = 0; + private HashMap<Integer,BigInteger> expTable = null; - public EncRowCalcPrecomputedCache(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) + public EncRowCalcPrecomputedCache(Accumulators accumIn, BroadcastVars bvIn) { - accum = pirWLAccum; + accum = accumIn; + bVars = bvIn; - query = pirWLBBVars.getQuery(); - QueryInfo queryInfo = pirWLBBVars.getQueryInfo(); - QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + query = bVars.getQuery(); - if (pirWLBBVars.getUseLocalCache().equals("true")) - { - boolean useLocalCache = true; - } - limitHitsPerSelector = pirWLBBVars.getLimitHitsPerSelector(); - maxHitsPerSelector = pirWLBBVars.getMaxHitsPerSelector(); + limitHitsPerSelector = bVars.getLimitHitsPerSelector(); + maxHitsPerSelector = bVars.getMaxHitsPerSelector(); expTable = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java index bbd0edd..90fef67 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java @@ -25,8 +25,6 @@ import org.apache.hadoop.io.MapWritable; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.responder.wideskies.common.HashSelectorAndPartitionData; import org.apache.pirk.schema.data.DataSchema; -import org.apache.pirk.schema.data.LoadDataSchemas; -import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.spark.api.java.function.PairFunction; import org.slf4j.Logger; @@ -44,15 +42,21 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I private static final Logger logger = LoggerFactory.getLogger(HashSelectorsAndPartitionData.class); + Accumulators accum = null; + BroadcastVars bVars = null; + private QueryInfo queryInfo = null; private QuerySchema qSchema = null; private DataSchema dSchema = null; - public HashSelectorsAndPartitionData(Accumulators pirWLAccum, BroadcastVars pirWLBBVars) + public HashSelectorsAndPartitionData(Accumulators accumIn, BroadcastVars bvIn) { - queryInfo = pirWLBBVars.getQueryInfo(); - qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); - dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); + accum = accumIn; + bVars = bvIn; + + queryInfo = bVars.getQueryInfo(); + qSchema = bVars.getQuerySchema(); + dSchema = bVars.getDataSchema(); logger.info("Initialized HashSelectorsAndPartitionData"); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index 4ac3923..80baea5 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -30,6 +30,8 @@ import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.KeyedHash; import org.apache.pirk.utils.SystemConfiguration; @@ -55,6 +57,7 @@ public class Responder private Query query = null; private QueryInfo queryInfo = null; + private QuerySchema qSchema = null; private String queryType = null; @@ -70,6 +73,15 @@ public class Responder queryInfo = query.getQueryInfo(); queryType = queryInfo.getQueryType(); + if (SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = LoadQuerySchemas.getSchema(queryType); + } + response = new Response(queryInfo); // Columns are allocated as needed, initialized to 1 @@ -115,7 +127,7 @@ public class Responder logger.info("jsonData = " + jsonData.toJSONString()); - String selector = QueryUtils.getSelectorByQueryTypeJSON(queryType, jsonData); + String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, jsonData); addDataElement(selector, jsonData); } br.close(); @@ -158,7 +170,7 @@ public class Responder { // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(queryType, jsonData, queryInfo.getEmbedSelector()); + ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, jsonData, queryInfo.getEmbedSelector()); // Pull the necessary elements int rowIndex = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java index 73995e8..60eaa66 100644 --- a/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java +++ b/src/main/java/org/apache/pirk/schema/data/LoadDataSchemas.java @@ -180,7 +180,7 @@ public class LoadDataSchemas String name = eElement.getElementsByTagName("name").item(0).getTextContent().trim().toLowerCase(); String type = eElement.getElementsByTagName("type").item(0).getTextContent().trim(); - // An absent isArray means false, and an empty isArray means true, otherwise take the value. + // An absent isArray means false, and an empty isArray means true, otherwise take the value. String isArray = "false"; Node isArrayNode = eElement.getElementsByTagName("isArray").item(0); if (isArrayNode != null) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/schema/query/QuerySchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java index 7610b52..09e4d85 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchema.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchema.java @@ -42,7 +42,7 @@ public class QuerySchema implements Serializable private String dataSchemaName = null; // name of the DataSchema for this query schema private TreeSet<String> elementNames = null; // names of elements in the data schema to - // include in the response, order matters for packing/unpacking + // include in the response, order matters for packing/unpacking private String filter = null; // name of filter class to use in data filtering http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java index 020d464..15d7622 100644 --- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -68,6 +68,9 @@ public class DistTestSuite SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); SystemConfiguration.setProperty("pir.maxHitsPerSelector", "100"); + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Set up base configs SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT); SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY)); @@ -131,6 +134,19 @@ public class DistTestSuite // Reset property SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + // Test embedded QuerySchema + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + logger.info("Completed testJSONInputMR"); } @@ -144,6 +160,9 @@ public class DistTestSuite SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Set up ES configs SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES); SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0"); @@ -184,6 +203,9 @@ public class DistTestSuite SystemConfiguration.setProperty("pir.numColMultPartitions", "20"); SystemConfiguration.setProperty("pir.colMultReduceByKey", "false"); + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Set up JSON configs SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT); SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY)); @@ -201,6 +223,20 @@ public class DistTestSuite BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2); + // Test embedded QuerySchema + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Test pad columns SystemConfiguration.setProperty("pir.padEmptyColumns", "true"); BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); @@ -255,6 +291,9 @@ public class DistTestSuite SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Set up ES configs SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES); SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index 51497d6..1816783 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -25,6 +25,8 @@ import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.query.wideskies.QueryUtils; +import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.test.distributed.testsuite.DistTestSuite; import org.apache.pirk.utils.StringUtils; @@ -46,8 +48,7 @@ public class BaseTests public static int dataPartitionBitSize = 8; // Selectors for domain and IP queries, queryNum is the first entry for file generation - private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", - "x.y.net")); + private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); private static ArrayList<String> selectorsIP = new ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", "13.14.15.16", "21.22.23.24")); // Encryption variables -- Paillier mechanisms are tested in the Paillier test code, so these are fixed... @@ -73,6 +74,8 @@ public class BaseTests { logger.info("Running testDNSHostnameQuery(): "); + QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_HOSTNAME_QUERY); + int numExpectedResults = 6; ArrayList<QueryResponseJSON> results; if (isDistributed) @@ -161,7 +164,7 @@ public class BaseTests wlJSON.setMapping(Inputs.QTYPE, parseShortArray(dataMap, Inputs.QTYPE)); wlJSON.setMapping(Inputs.RCODE, dataMap.get(Inputs.RCODE)); wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); - wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(Inputs.DNS_HOSTNAME_QUERY, dataMap)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); correctResults.add(wlJSON); } ++i; @@ -196,7 +199,9 @@ public class BaseTests { logger.info("Running testDNSIPQuery(): "); + QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_IP_QUERY); ArrayList<QueryResponseJSON> results; + if (isDistributed) { results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads); @@ -237,7 +242,7 @@ public class BaseTests wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); - wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(Inputs.DNS_IP_QUERY, dataMap)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); correctResults.add(wlJSON); } ++i; @@ -269,7 +274,9 @@ public class BaseTests { logger.info("Running testDNSNXDOMAINQuery(): "); + QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_NXDOMAIN_QUERY); ArrayList<QueryResponseJSON> results; + if (isDistributed) { results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads); @@ -300,7 +307,7 @@ public class BaseTests wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this gets re-embedded as the original selector after decryption wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); - wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(Inputs.DNS_NXDOMAIN_QUERY, dataMap)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); correctResults.add(wlJSON); } ++i; @@ -331,7 +338,9 @@ public class BaseTests { logger.info("Running testSRCIPQuery(): "); + QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_SRCIP_QUERY); ArrayList<QueryResponseJSON> results; + int removeTailElements = 0; int numExpectedResults = 1; if (isDistributed) @@ -373,7 +382,7 @@ public class BaseTests qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); - qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(Inputs.DNS_SRCIP_QUERY, dataMap)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); correctResults.add(qrJSON); } ++i; @@ -403,7 +412,9 @@ public class BaseTests { logger.info("Running testSRCIPQueryNoFilter(): "); + QuerySchema qSchema = LoadQuerySchemas.getSchema(Inputs.DNS_SRCIP_QUERY_NO_FILTER); ArrayList<QueryResponseJSON> results; + int numExpectedResults = 3; if (isDistributed) { @@ -442,7 +453,7 @@ public class BaseTests qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); - qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(Inputs.DNS_SRCIP_QUERY_NO_FILTER, dataMap)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); correctResults.add(qrJSON); } ++i; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index 5070672..b36ad70 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -101,7 +101,7 @@ public class Inputs public static final String DATA_SCHEMA_FILE_HDFS = "/tmp/testDataSchemaFile.xml"; /** - * Delete the Elastic Search indices that was used for functional testing + * Delete the ElasticSearch indices that was used for functional testing */ public static void deleteESInput() { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java index c33971e..684d04d 100644 --- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java +++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java @@ -33,6 +33,8 @@ import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.responder.wideskies.standalone.Responder; import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.PIRException; @@ -59,6 +61,7 @@ public class StandaloneQuery logger.info("Performing watchlisting: "); ArrayList<QueryResponseJSON> results = null; + QuerySchema qSchema = LoadQuerySchemas.getSchema(queryType); // Create the necessary files LocalFileSystemStore storage = new LocalFileSystemStore(); @@ -81,6 +84,11 @@ public class StandaloneQuery QueryInfo queryInfo = new QueryInfo(BaseTests.queryNum, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType, queryType + "_" + BaseTests.queryNum, BaseTests.paillierBitSize, useExpLookupTable, embedSelector, useHDFSExpLookupTable); + if (SystemConfiguration.getProperty("pir.embedQuerySchema", "false").equals("true")) + { + queryInfo.addQuerySchema(qSchema); + } + Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); // Perform the encryption @@ -110,7 +118,7 @@ public class StandaloneQuery logger.info("Query and Responder elements constructed"); for (JSONObject jsonData : dataElements) { - String selector = QueryUtils.getSelectorByQueryTypeJSON(queryType, jsonData); + String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, jsonData); logger.info("selector = " + selector + " numDataElements = " + jsonData.size()); try { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/utils/QueryParserUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java index afef6c9..36892a3 100644 --- a/src/main/java/org/apache/pirk/utils/QueryParserUtils.java +++ b/src/main/java/org/apache/pirk/utils/QueryParserUtils.java @@ -26,11 +26,13 @@ import java.util.regex.Pattern; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; + import org.apache.pirk.inputformat.hadoop.TextArrayWritable; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; import org.elasticsearch.hadoop.mr.WritableArrayWritable; + +import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/StringUtils.java b/src/main/java/org/apache/pirk/utils/StringUtils.java index be8ea09..c06e969 100755 --- a/src/main/java/org/apache/pirk/utils/StringUtils.java +++ b/src/main/java/org/apache/pirk/utils/StringUtils.java @@ -27,12 +27,14 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; + import org.apache.pirk.schema.data.DataSchema; import org.elasticsearch.hadoop.mr.WritableArrayWritable; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; + +import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/java/org/apache/pirk/utils/SystemConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java index b4db2a6..b529e8f 100755 --- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java +++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java @@ -105,7 +105,7 @@ public class SystemConfiguration File localFile = new File(getProperty(LOCAL_PROPERTY_FILE)); if (localFile.exists()) { - try(InputStream stream = new FileInputStream(localFile);) + try (InputStream stream = new FileInputStream(localFile);) { logger.info("Loading local properties file '" + localFile.getAbsolutePath() + "'"); props.load(stream); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index e07107b..2a318be 100755 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -25,7 +25,7 @@ log4j.rootLogger=info, stdout, rolling #log4j.rootLogger=debug, info, stdout, rolling # Example of adding a specific package/class at a different -#log4j.category.responder.wideskies=debug +#log4j.category.org.apache.pirk=debug # BEGIN APPENDER: CONSOLE APPENDER (stdout) @@ -43,7 +43,7 @@ log4j.appender.stdout.layout.ConversionPattern=%5p [%t] %d (%F:%L) - %m%n # BEGIN APPENDER: ROLLING FILE APPENDER (rolling) # first: type of appender (fully qualified class name) log4j.appender.rolling=org.apache.log4j.RollingFileAppender -log4j.appender.rolling.File=./logs/aip.log +log4j.appender.rolling.File=./logs/pirk.log log4j.appender.rolling.MaxFileSize=1MB # number of backups to keep log4j.appender.rolling.MaxBackupIndex=2 http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/main/resources/pirk.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties index 49367cb..d6ea68d 100755 --- a/src/main/resources/pirk.properties +++ b/src/main/resources/pirk.properties @@ -86,7 +86,7 @@ query.schemas = none #ES host address - One Elasticsearch node in the cluster - may include port specification es.nodes= none - + #Default HTTP/REST port used for connecting to Elasticsearch es.port=9200 @@ -116,16 +116,16 @@ test.pir.es.resource = none #Pathname in hdfs to place input JSON file testing test.inputJSONFile = none -test.pir.inputJSONFile = none +test.pir.inputJSONFile = /tmp/testJSONInput #Pathname in hdfs to place output file for testing -test.outputHDFSFile = none +test.outputHDFSFile = /tmp/testOutput #PIR query input dir in hdfs for testing test.pir.queryInputDir = none #PIR stoplist file -test.pir.stopListFile = none +test.pir.stopListFile = /tmp/testStopListFile #Whether or not we are running PIR testing (used as a flag to dump intermediate RDDs for checking) #This should default to false; it is changed to true in the test suite, as applicable http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/test/java/test/general/PartitionUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/general/PartitionUtilsTest.java b/src/test/java/test/general/PartitionUtilsTest.java index 1ee866a..7dc97e2 100644 --- a/src/test/java/test/general/PartitionUtilsTest.java +++ b/src/test/java/test/general/PartitionUtilsTest.java @@ -28,6 +28,7 @@ import org.apache.pirk.utils.SystemConfiguration; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/test/java/test/general/QueryParserUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/general/QueryParserUtilsTest.java b/src/test/java/test/general/QueryParserUtilsTest.java index af2235c..c57676e 100644 --- a/src/test/java/test/general/QueryParserUtilsTest.java +++ b/src/test/java/test/general/QueryParserUtilsTest.java @@ -31,6 +31,7 @@ import org.json.simple.JSONObject; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/test/java/test/schema/query/LoadQuerySchemaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/schema/query/LoadQuerySchemaTest.java b/src/test/java/test/schema/query/LoadQuerySchemaTest.java index c1a5d9e..dee8336 100644 --- a/src/test/java/test/schema/query/LoadQuerySchemaTest.java +++ b/src/test/java/test/schema/query/LoadQuerySchemaTest.java @@ -49,6 +49,8 @@ import test.schema.data.LoadDataSchemaTest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import test.schema.data.LoadDataSchemaTest; + /** * Test suite for LoadQuerySchema and QuerySchema */ http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/99f9c624/src/test/java/test/wideskies/standalone/StandaloneTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/test/wideskies/standalone/StandaloneTest.java b/src/test/java/test/wideskies/standalone/StandaloneTest.java index cfcaa7a..4b52981 100644 --- a/src/test/java/test/wideskies/standalone/StandaloneTest.java +++ b/src/test/java/test/wideskies/standalone/StandaloneTest.java @@ -66,6 +66,9 @@ public class StandaloneTest ArrayList<JSONObject> dataElements = Inputs.createJSONDataElements(); ArrayList<JSONObject> dataElementsRcode3 = Inputs.getRcode3JSONDataElements(); + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Run tests and use the embedded selector SystemConfiguration.setProperty("pirTest.embedSelector", "true"); BaseTests.testDNSHostnameQuery(dataElements, 1, false); @@ -73,6 +76,20 @@ public class StandaloneTest BaseTests.testDNSIPQuery(dataElements, 3); // numThreads % num elements to encrypt != 0 BaseTests.testDNSNXDOMAINQuery(dataElementsRcode3, 4); // numThreads % num elements to encrypt = 0 + // Test embedded QuerySchema + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + BaseTests.testDNSHostnameQuery(dataElements, 1, false); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, 1, false); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, 1, false); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + // Run tests without using the embedded selector SystemConfiguration.setProperty("pirTest.embedSelector", "false"); BaseTests.testDNSHostnameQuery(dataElements, 1, false);
