Repository: incubator-pirk Updated Branches: refs/heads/master 9f3eb4beb -> 371e53903
PIRK-12 -- Refactor EncryptQuery#encrypt(int) - closes apache/incubator-pirk#14 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/371e5390 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/371e5390 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/371e5390 Branch: refs/heads/master Commit: 371e53903bc56a390819e710d9746e44cbe9d548 Parents: 9f3eb4b Author: tellison <[email protected]> Authored: Thu Jul 21 14:51:39 2016 -0400 Committer: eawilliams <[email protected]> Committed: Thu Jul 21 14:51:39 2016 -0400 ---------------------------------------------------------------------- .../querier/wideskies/encrypt/EncryptQuery.java | 131 +++++++++---------- .../wideskies/encrypt/EncryptQueryRunnable.java | 48 ++----- 2 files changed, 76 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/371e5390/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index 9f8c46f..ca38fc5 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -20,11 +20,9 @@ package org.apache.pirk.querier.wideskies.encrypt; import java.io.File; import java.io.IOException; -import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -121,56 +119,63 @@ public class EncryptQuery { query = new Query(queryInfo, paillier.getN()); - int dataPartitionBitSize = queryInfo.getDataPartitionBitSize(); + // Determine the query vector mappings for the selectors; vecPosition -> selectorNum + HashMap<Integer,Integer> selectorQueryVecMapping = computeSelectorQueryVecMap(); + + // Form the embedSelectorMap + populateEmbeddedSelectorMap(); + + parallelEncrypt(numThreads, selectorQueryVecMapping); + + // Generate the expTable in Query, if we are using it and if + // useHDFSExpLookupTable is false -- if we are generating it as standalone and not on the cluster + if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) + { + logger.info("Starting expTable generation"); + + // This has to be reasonably multithreaded or it takes forever... + query.generateExpTable(Math.max(8, numThreads)); + } + // Set the Querier object + querier = new Querier(queryInfo, selectors, paillier, query, embedSelectorMap); + } + + private HashMap<Integer,Integer> computeSelectorQueryVecMap() + { String hashKey = queryInfo.getHashKey(); - int hashBitSize = queryInfo.getHashBitSize(); int keyCounter = 0; + int numSelectors = selectors.size(); + HashSet<Integer> hashes = new HashSet<Integer>(numSelectors); + HashMap<Integer,Integer> selectorQueryVecMapping = new HashMap<Integer,Integer>(numSelectors); - // Determine the query vector mappings for the selectors; vecPosition -> selectorNum - HashMap<Integer,Integer> selectorQueryVecMapping = new HashMap<Integer,Integer>(); - HashSet<Integer> hashes = new HashSet<Integer>(); - while (true) + for (int index = 0; index < numSelectors; index++) { - int j = 0; - boolean uniqueHashes = true; // All keyed hashes of the selectors must be unique - while (j < selectors.size()) - { - String selector = selectors.get(j); - logger.debug("j = " + j + " Encrypting selector = " + selector); - - int hash = KeyedHash.hash(hashKey, hashBitSize, selector); - if (hashes.contains(hash)) - { - uniqueHashes = false; - break; - } - else - { - hashes.add(hash); - selectorQueryVecMapping.put(hash, j); - - logger.debug("j = " + j + " hash = " + hash); - } - ++j; - } + String selector = selectors.get(index); + int hash = KeyedHash.hash(hashKey, queryInfo.getHashBitSize(), selector); - if (uniqueHashes) + // All keyed hashes of the selectors must be unique + if (hashes.add(hash)) { - break; // if all of our hashes are unique, we are done + // The hash is unique + selectorQueryVecMapping.put(hash, index); + logger.debug("index = " + index + "selector = " + selector + " hash = " + hash); } else - // Iterate with a new key until we have unique keyed hashes among the selectors { + // Hash collision hashes.clear(); selectorQueryVecMapping.clear(); - - ++keyCounter; - hashKey += keyCounter; + hashKey = queryInfo.getHashKey() + ++keyCounter; + logger.debug("index = " + index + "selector = " + selector + " hash collision = " + hash + " new key = " + hashKey); + index = 0; } } + return selectorQueryVecMapping; + } - // Form the embedSelectorMap + private void populateEmbeddedSelectorMap() + { QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); DataSchema dSchema = LoadDataSchemas.getSchema(qSchema.getDataSchemaName()); String type = dSchema.getElementType(qSchema.getSelectorName()); @@ -185,19 +190,23 @@ public class EncryptQuery { logger.info("Caught exception for selector = " + selector); e.printStackTrace(); + // TODO: Check: should continue? } embedSelectorMap.put(sNum, embeddedSelector); ++sNum; } + } + private void parallelEncrypt(int numThreads, HashMap<Integer,Integer> selectorQueryVecMapping) throws PIRException + { // Encrypt and form the query vector ExecutorService es = Executors.newCachedThreadPool(); + ArrayList<EncryptQueryRunnable> runnables = new ArrayList<EncryptQueryRunnable>(numThreads); + int numElements = 1 << queryInfo.getHashBitSize(); // 2^hashBitSize - int elementsPerThread = (int) (Math.floor(Math.pow(2, hashBitSize) / numThreads)); - - ArrayList<EncryptQueryRunnable> runnables = new ArrayList<EncryptQueryRunnable>(); - + // Split the work across the requested number of threads + int elementsPerThread = numElements / numThreads; for (int i = 0; i < numThreads; ++i) { // Grab the range of the thread @@ -205,10 +214,9 @@ public class EncryptQuery int stop = start + elementsPerThread - 1; if (i == (numThreads - 1)) { - stop = (int) (Math.pow(2, hashBitSize) - 1); + stop = numElements - 1; } - // Create the runnable and execute // Copy selectorQueryVecMapping (if numThreads > 1) so we don't have to synchronize - only has size = selectors.size() HashMap<Integer,Integer> selectorQueryVecMappingCopy = null; if (numThreads == 1) @@ -219,14 +227,24 @@ public class EncryptQuery { selectorQueryVecMappingCopy = new HashMap<Integer,Integer>(selectorQueryVecMapping); } - EncryptQueryRunnable runEnc = new EncryptQueryRunnable(dataPartitionBitSize, hashBitSize, paillier.clone(), selectorQueryVecMappingCopy, start, stop); + + // Create the runnable and execute + EncryptQueryRunnable runEnc = new EncryptQueryRunnable(queryInfo.getDataPartitionBitSize(), paillier.clone(), selectorQueryVecMappingCopy, start, stop); runnables.add(runEnc); es.execute(runEnc); } // Allow threads to complete es.shutdown(); // previously submitted tasks are executed, but no new tasks will be accepted - boolean finished = es.awaitTermination(1, TimeUnit.DAYS); // waits until all tasks complete or until the specified timeout + boolean finished = false; + try + { + // waits until all tasks complete or until the specified timeout + finished = es.awaitTermination(1, TimeUnit.DAYS); + } catch (InterruptedException e) + { + Thread.interrupted(); + } if (!finished) { @@ -236,30 +254,9 @@ public class EncryptQuery // Pull all encrypted elements and add to Query for (EncryptQueryRunnable runner : runnables) { - TreeMap<Integer,BigInteger> encValues = runner.getEncryptedValues(); - query.addQueryElements(encValues); + query.addQueryElements(runner.getEncryptedValues()); } logger.info("Completed creation of encrypted query vectors"); - - // Generate the expTable in Query, if we are using it and if - // useHDFSExpLookupTable is false -- if we are generating it as standalone and not on the cluster - if (query.getQueryInfo().getUseExpLookupTable() && !query.getQueryInfo().getUseHDFSExpLookupTable()) - { - logger.info("Starting expTable generation"); - - // This has to be reasonably multithreaded or it takes forever... - if (numThreads < 8) - { - query.generateExpTable(8); - } - else - { - query.generateExpTable(numThreads); - } - } - - // Set the Querier object - querier = new Querier(queryInfo, selectors, paillier, query, embedSelectorMap); } /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/371e5390/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java index c060197..04fb929 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java @@ -36,7 +36,6 @@ public class EncryptQueryRunnable implements Runnable private static Logger logger = LogUtils.getLoggerForThisClass(); int dataPartitionBitSize = 0; - int hashBitSize = 0; int start = 0; // start of computing range for the runnable int stop = 0; // stop, inclusive, of the computing range for the runnable @@ -45,11 +44,10 @@ public class EncryptQueryRunnable implements Runnable TreeMap<Integer,BigInteger> encryptedValues = null; // holds the ordered encrypted values to pull after thread computation is complete - public EncryptQueryRunnable(int dataPartitionBitSizeInput, int hashBitSizeInput, Paillier paillierInput, - HashMap<Integer,Integer> selectorQueryVecMappingInput, int startInput, int stopInput) + public EncryptQueryRunnable(int dataPartitionBitSizeInput, Paillier paillierInput, HashMap<Integer,Integer> selectorQueryVecMappingInput, int startInput, + int stopInput) { dataPartitionBitSize = dataPartitionBitSizeInput; - hashBitSize = hashBitSizeInput; paillier = paillierInput; selectorQueryVecMapping = selectorQueryVecMappingInput; @@ -73,42 +71,20 @@ public class EncryptQueryRunnable implements Runnable @Override public void run() { - int i = start; - while (i <= stop) + for (int i = start; i <= stop; i++) { - if (selectorQueryVecMapping.containsKey(i)) + Integer selectorNum = selectorQueryVecMapping.get(i); + BigInteger valToEnc = (selectorNum == null) ? BigInteger.ZERO : (BigInteger.valueOf(2)).pow(selectorNum * dataPartitionBitSize); + BigInteger encVal; + try { - int selectorNum = selectorQueryVecMapping.get(i); - BigInteger valToEnc = (BigInteger.valueOf(2)).pow(selectorNum * dataPartitionBitSize); - - BigInteger encVal; - try - { - encVal = paillier.encrypt(valToEnc); - } catch (PIRException e) - { - e.printStackTrace(); - throw new RuntimeException(e.toString()); - } - encryptedValues.put(i, encVal); - - logger.debug("selectorNum = " + selectorNum + " valToEnc = " + valToEnc + " envVal = " + encVal); - } - else + encVal = paillier.encrypt(valToEnc); + } catch (PIRException e) { - BigInteger encZero; - try - { - encZero = paillier.encrypt(BigInteger.ZERO); - } catch (PIRException e) - { - e.printStackTrace(); - throw new RuntimeException(e.toString()); - } - encryptedValues.put(i, encZero); + throw new RuntimeException(e); } - ++i; + encryptedValues.put(i, encVal); + logger.debug("selectorNum = " + selectorNum + " valToEnc = " + valToEnc + " envVal = " + encVal); } } - }
