Repository: incubator-pirk Updated Branches: refs/heads/master bda0ad76a -> e914b1118
Pass query elements into Query constructor -- closes apache/incubator-pirk#82 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/e914b111 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/e914b111 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/e914b111 Branch: refs/heads/master Commit: e914b1118c745d10cf81528f8a9aac7b54595ee9 Parents: bda0ad7 Author: tellison <[email protected]> Authored: Tue Aug 30 13:02:50 2016 -0400 Committer: eawilliams <[email protected]> Committed: Tue Aug 30 13:02:50 2016 -0400 ---------------------------------------------------------------------- .../querier/wideskies/encrypt/EncryptQuery.java | 30 +++++++------- .../wideskies/encrypt/EncryptQueryTask.java | 2 +- .../org/apache/pirk/query/wideskies/Query.java | 41 ++++++-------------- .../apache/pirk/query/wideskies/QueryInfo.java | 4 +- .../mapreduce/ComputeResponseTool.java | 9 +++-- .../wideskies/mapreduce/RowCalcReducer.java | 2 +- .../wideskies/spark/ComputeExpLookupTable.java | 6 +-- .../wideskies/spark/ComputeResponse.java | 2 +- .../responder/wideskies/spark/EncRowCalc.java | 2 +- .../wideskies/standalone/Responder.java | 2 +- .../java/org/apache/pirk/utils/KeyedHash.java | 1 - 11 files changed, 43 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index 60ba859..91136d1 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -122,8 +123,6 @@ public class EncryptQuery */ public Querier encrypt(int numThreads) throws InterruptedException, PIRException { - Query query = new Query(queryInfo, paillier.getN()); - // Determine the query vector mappings for the selectors; vecPosition -> selectorNum Map<Integer,Integer> selectorQueryVecMapping = computeSelectorQueryVecMap(); @@ -133,18 +132,23 @@ public class EncryptQuery // if the selector is of variable lengths Map<Integer,String> embedSelectorMap = computeEmbeddedSelectorMap(); + SortedMap<Integer,BigInteger> queryElements; if (numThreads == 1) { - serialEncrypt(query, selectorQueryVecMapping); + queryElements = serialEncrypt(selectorQueryVecMapping); + logger.info("Completed serial creation of encrypted query vectors"); } else { - parallelEncrypt(Math.max(2, numThreads), query, selectorQueryVecMapping); + queryElements = parallelEncrypt(selectorQueryVecMapping, Math.max(2, numThreads)); + logger.info("Completed parallel creation of encrypted query vectors"); } + Query query = new Query(queryInfo, paillier.getN(), queryElements); + // Generate the expTable in Query, if we are using it and if // useHDFSExpLookupTable is false -- if we are generating it as standalone and not on the cluster - if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) + if (queryInfo.useExpLookupTable() && !queryInfo.useHDFSExpLookupTable()) { logger.info("Starting expTable generation"); query.generateExpTable(); @@ -208,22 +212,21 @@ public class EncryptQuery } /* - * Perform the encryption using a single thread, and avoiding the overhead of thread management. + * Perform the encryption using a single thread, avoiding the overhead of thread management. */ - private void serialEncrypt(Query query, Map<Integer,Integer> selectorQueryVecMapping) throws PIRException + private SortedMap<Integer,BigInteger> serialEncrypt(Map<Integer,Integer> selectorQueryVecMapping) throws PIRException { int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize EncryptQueryTask task = new EncryptQueryTask(queryInfo.getDataPartitionBitSize(), paillier, selectorQueryVecMapping, 0, numElements - 1); - query.addQueryElements(task.call()); - logger.info("Completed serial creation of encrypted query vectors"); + return task.call(); } /* * Performs the encryption with numThreads. */ - private void parallelEncrypt(int numThreads, Query query, Map<Integer,Integer> selectorQueryVecMapping) throws InterruptedException, PIRException + private SortedMap<Integer,BigInteger> parallelEncrypt(Map<Integer,Integer> selectorQueryVecMapping, int numThreads) throws InterruptedException, PIRException { // Encrypt and form the query vector ExecutorService es = Executors.newCachedThreadPool(); @@ -243,16 +246,17 @@ public class EncryptQuery } // Create the runnable and execute - EncryptQueryTask runEnc = new EncryptQueryTask(queryInfo.getDataPartitionBitSize(), paillier.clone(), selectorQueryVecMapping, start, stop); + EncryptQueryTask runEnc = new EncryptQueryTask(queryInfo.getDataPartitionBitSize(), paillier, selectorQueryVecMapping, start, stop); futures.add(es.submit(runEnc)); } // Pull all encrypted elements and add to resultMap + SortedMap<Integer,BigInteger> queryElements = new TreeMap<>(); try { for (Future<SortedMap<Integer,BigInteger>> future : futures) { - query.addQueryElements(future.get(1, TimeUnit.DAYS)); + queryElements.putAll(future.get(1, TimeUnit.DAYS)); } } catch (TimeoutException | ExecutionException e) { @@ -261,6 +265,6 @@ public class EncryptQuery es.shutdown(); - logger.info("Completed parallel creation of encrypted query vectors"); + return queryElements; } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java index 2993ee5..4f0bdf2 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryTask.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; /** * Runnable class for multithreaded PIR encryption */ -class EncryptQueryTask implements Callable<SortedMap<Integer,BigInteger>> +final class EncryptQueryTask implements Callable<SortedMap<Integer,BigInteger>> { private static final Logger logger = LoggerFactory.getLogger(EncryptQueryTask.class); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/query/wideskies/Query.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java index b0322d0..51e6bb4 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/Query.java +++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java @@ -23,7 +23,6 @@ import java.math.BigInteger; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -42,9 +41,9 @@ public class Query implements Serializable, Storable private static final Logger logger = LoggerFactory.getLogger(Query.class); - private final QueryInfo qInfo; // holds all query info + private final QueryInfo queryInfo; // holds all query info - private final TreeMap<Integer,BigInteger> queryElements = new TreeMap<>(); // query elements - ordered on insertion + private final SortedMap<Integer,BigInteger> queryElements; // query elements - ordered on insertion // lookup table for exponentiation of query vectors - based on dataPartitionBitSize // element -> <power, element^power mod N^2> @@ -57,19 +56,20 @@ public class Query implements Serializable, Storable private final BigInteger N; // N=pq, RSA modulus for the Paillier encryption associated with the queryElements private final BigInteger NSquared; - public Query(QueryInfo queryInfoIn, BigInteger NInput) + public Query(QueryInfo queryInfo, BigInteger N, SortedMap<Integer,BigInteger> queryElements) { - qInfo = queryInfoIn; - N = NInput; + this.queryInfo = queryInfo; + this.N = N; NSquared = N.pow(2); + this.queryElements = queryElements; } public QueryInfo getQueryInfo() { - return qInfo; + return queryInfo; } - public TreeMap<Integer,BigInteger> getQueryElements() + public SortedMap<Integer,BigInteger> getQueryElements() { return queryElements; } @@ -104,33 +104,13 @@ public class Query implements Serializable, Storable expFileBasedLookup = expInput; } - public Map<BigInteger,Map<Integer,BigInteger>> getExpTable() - { - return expTable; - } - - public void setExpTable(Map<BigInteger,Map<Integer,BigInteger>> expTableInput) - { - expTable = expTableInput; - } - - public void addQueryElements(SortedMap<Integer,BigInteger> elements) - { - queryElements.putAll(elements); - } - - public boolean containsElement(BigInteger element) - { - return queryElements.containsValue(element); - } - /** * This should be called after all query elements have been added in order to generate the expTable. For int exponentiation with BigIntegers, assumes that * dataPartitionBitSize < 32. */ public void generateExpTable() { - int maxValue = (1 << qInfo.getDataPartitionBitSize()) - 1; // 2^partitionBitSize - 1 + int maxValue = (1 << queryInfo.getDataPartitionBitSize()) - 1; // 2^partitionBitSize - 1 queryElements.values().parallelStream().forEach(new Consumer<BigInteger>() { @@ -151,6 +131,7 @@ public class Query implements Serializable, Storable public BigInteger getExp(BigInteger value, int power) { - return expTable.get(value).get(power); + Map<Integer,BigInteger> powerMap = expTable.get(value); + return (powerMap == null) ? null : powerMap.get(power); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java index 76cd755..1482137 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java +++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java @@ -166,12 +166,12 @@ public class QueryInfo implements Serializable, Cloneable return dataPartitionBitSize; } - public boolean getUseExpLookupTable() + public boolean useExpLookupTable() { return useExpLookupTable; } - public boolean getUseHDFSExpLookupTable() + public boolean useHDFSExpLookupTable() { return useHDFSExpLookupTable; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 544e3ed..1d06f86 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -24,7 +24,8 @@ import java.io.InputStreamReader; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; -import java.util.TreeMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -154,7 +155,7 @@ public class ComputeResponseTool extends Configured implements Tool Path outPathFinal = new Path(outputDirFinal); // If we are using distributed exp tables -- Create the expTable file in hdfs for this query, if it doesn't exist - if ((queryInfo.getUseHDFSExpLookupTable() || useHDFSLookupTable) && query.getExpFileBasedLookup().isEmpty()) + if ((queryInfo.useHDFSExpLookupTable() || useHDFSLookupTable) && query.getExpFileBasedLookup().isEmpty()) { success = computeExpTable(); } @@ -243,8 +244,8 @@ public class ComputeResponseTool extends Configured implements Tool fs.delete(splitDir, true); } // Write the query hashes to the split files - TreeMap<Integer,BigInteger> queryElements = query.getQueryElements(); - ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet()); + Map<Integer,BigInteger> queryElements = query.getQueryElements(); + List<Integer> keys = new ArrayList<>(queryElements.keySet()); int numSplits = SystemConfiguration.getIntProperty("pir.expCreationSplits", 100); int elementsPerSplit = queryElements.size() / numSplits; // Integral division. http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java index 75b0529..0cb8ab9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java @@ -115,7 +115,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW logger.debug("Processing reducer for hash = " + rowIndex); ctx.getCounter(MRStats.NUM_HASHES_REDUCER).increment(1); - if (queryInfo.getUseHDFSExpLookupTable()) + if (queryInfo.useHDFSExpLookupTable()) { ComputeEncryptedRow.loadCacheFromHDFS(fs, query.getExpFile(rowIndex.get()), query); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java index 6b64e35..a5469d1 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java @@ -22,8 +22,8 @@ import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -83,8 +83,8 @@ public class ComputeExpLookupTable } // Write the query hashes to a RDD - TreeMap<Integer,BigInteger> queryElements = query.getQueryElements(); - ArrayList<Integer> keys = new ArrayList<>(queryElements.keySet()); + Map<Integer,BigInteger> queryElements = query.getQueryElements(); + List<Integer> keys = new ArrayList<>(queryElements.keySet()); int numSplits = SystemConfiguration.getIntProperty("pir.expCreationSplits", 100); JavaRDD<Integer> queryHashes = sc.parallelize(keys, numSplits); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 6014435..f2a54d7 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -335,7 +335,7 @@ public class ComputeResponse logger.info("Performing query: "); // If we are using distributed exp tables -- Create the expTable file in hdfs for this query, if it doesn't exist - if ((queryInfo.getUseHDFSExpLookupTable() || useHDFSLookupTable) && query.getExpFileBasedLookup().isEmpty()) + if ((queryInfo.useHDFSExpLookupTable() || useHDFSLookupTable) && query.getExpFileBasedLookup().isEmpty()) { // <queryHash, <<power>,<element^power mod N^2>> JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations = ComputeExpLookupTable.computeExpTable(sc, fs, bVars, query, queryInput, http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java index ef279e2..20f02ad 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java @@ -78,7 +78,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<L int rowIndex = hashDocTuple._1; accum.incNumHashes(1); - if (queryInfo.getUseHDFSExpLookupTable()) + if (queryInfo.useHDFSExpLookupTable()) { FileSystem fs; try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index e4bc03f..75cd292 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -190,7 +190,7 @@ public class Responder logger.debug("Before: columns.get(" + (i + rowCounter) + ") = " + columns.get(i + rowCounter)); BigInteger exp; - if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) // using the standalone + if (query.getQueryInfo().useExpLookupTable() && !query.getQueryInfo().useHDFSExpLookupTable()) // using the standalone // lookup table { exp = query.getExp(rowQuery, hitValPartitions.get(i).intValue()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/e914b111/src/main/java/org/apache/pirk/utils/KeyedHash.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/KeyedHash.java b/src/main/java/org/apache/pirk/utils/KeyedHash.java index 665cb3f..e23cbf8 100644 --- a/src/main/java/org/apache/pirk/utils/KeyedHash.java +++ b/src/main/java/org/apache/pirk/utils/KeyedHash.java @@ -74,7 +74,6 @@ public class KeyedHash } catch (NoSuchAlgorithmException e) { - logger.info(e.toString()); bitLimitedHash = hash(key, bitSize, input); }
