Repository: incubator-pirk
Updated Branches:
  refs/heads/master 9f3eb4beb -> 371e53903


PIRK-12 -- Refactor EncryptQuery#encrypt(int) - closes apache/incubator-pirk#14


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/371e5390
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/371e5390
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/371e5390

Branch: refs/heads/master
Commit: 371e53903bc56a390819e710d9746e44cbe9d548
Parents: 9f3eb4b
Author: tellison <[email protected]>
Authored: Thu Jul 21 14:51:39 2016 -0400
Committer: eawilliams <[email protected]>
Committed: Thu Jul 21 14:51:39 2016 -0400

----------------------------------------------------------------------
 .../querier/wideskies/encrypt/EncryptQuery.java | 131 +++++++++----------
 .../wideskies/encrypt/EncryptQueryRunnable.java |  48 ++-----
 2 files changed, 76 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/371e5390/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java 
b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
index 9f8c46f..ca38fc5 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java
@@ -20,11 +20,9 @@ package org.apache.pirk.querier.wideskies.encrypt;
 
 import java.io.File;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -121,56 +119,63 @@ public class EncryptQuery
   {
     query = new Query(queryInfo, paillier.getN());
 
-    int dataPartitionBitSize = queryInfo.getDataPartitionBitSize();
+    // Determine the query vector mappings for the selectors; vecPosition -> 
selectorNum
+    HashMap<Integer,Integer> selectorQueryVecMapping = 
computeSelectorQueryVecMap();
+
+    // Form the embedSelectorMap
+    populateEmbeddedSelectorMap();
+
+    parallelEncrypt(numThreads, selectorQueryVecMapping);
+
+    // Generate the expTable in Query, if we are using it and if
+    // useHDFSExpLookupTable is false -- if we are generating it as standalone 
and not on the cluster
+    if (query.getQueryInfo().getUseExpLookupTable() && 
!query.getQueryInfo().getUseHDFSExpLookupTable())
+    {
+      logger.info("Starting expTable generation");
+
+      // This has to be reasonably multithreaded or it takes forever...
+      query.generateExpTable(Math.max(8, numThreads));
+    }
 
+    // Set the Querier object
+    querier = new Querier(queryInfo, selectors, paillier, query, 
embedSelectorMap);
+  }
+
+  private HashMap<Integer,Integer> computeSelectorQueryVecMap()
+  {
     String hashKey = queryInfo.getHashKey();
-    int hashBitSize = queryInfo.getHashBitSize();
     int keyCounter = 0;
+    int numSelectors = selectors.size();
+    HashSet<Integer> hashes = new HashSet<Integer>(numSelectors);
+    HashMap<Integer,Integer> selectorQueryVecMapping = new 
HashMap<Integer,Integer>(numSelectors);
 
-    // Determine the query vector mappings for the selectors; vecPosition -> 
selectorNum
-    HashMap<Integer,Integer> selectorQueryVecMapping = new 
HashMap<Integer,Integer>();
-    HashSet<Integer> hashes = new HashSet<Integer>();
-    while (true)
+    for (int index = 0; index < numSelectors; index++)
     {
-      int j = 0;
-      boolean uniqueHashes = true; // All keyed hashes of the selectors must 
be unique
-      while (j < selectors.size())
-      {
-        String selector = selectors.get(j);
-        logger.debug("j = " + j + " Encrypting selector = " + selector);
-
-        int hash = KeyedHash.hash(hashKey, hashBitSize, selector);
-        if (hashes.contains(hash))
-        {
-          uniqueHashes = false;
-          break;
-        }
-        else
-        {
-          hashes.add(hash);
-          selectorQueryVecMapping.put(hash, j);
-
-          logger.debug("j = " + j + " hash = " + hash);
-        }
-        ++j;
-      }
+      String selector = selectors.get(index);
+      int hash = KeyedHash.hash(hashKey, queryInfo.getHashBitSize(), selector);
 
-      if (uniqueHashes)
+      // All keyed hashes of the selectors must be unique
+      if (hashes.add(hash))
       {
-        break; // if all of our hashes are unique, we are done
+        // The hash is unique
+        selectorQueryVecMapping.put(hash, index);
+        logger.debug("index = " + index + "selector = " + selector + " hash = 
" + hash);
       }
       else
-      // Iterate with a new key until we have unique keyed hashes among the 
selectors
       {
+        // Hash collision
         hashes.clear();
         selectorQueryVecMapping.clear();
-
-        ++keyCounter;
-        hashKey += keyCounter;
+        hashKey = queryInfo.getHashKey() + ++keyCounter;
+        logger.debug("index = " + index + "selector = " + selector + " hash 
collision = " + hash + " new key = " + hashKey);
+        index = 0;
       }
     }
+    return selectorQueryVecMapping;
+  }
 
-    // Form the embedSelectorMap
+  private void populateEmbeddedSelectorMap()
+  {
     QuerySchema qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType());
     DataSchema dSchema = 
LoadDataSchemas.getSchema(qSchema.getDataSchemaName());
     String type = dSchema.getElementType(qSchema.getSelectorName());
@@ -185,19 +190,23 @@ public class EncryptQuery
       {
         logger.info("Caught exception for selector = " + selector);
         e.printStackTrace();
+        // TODO: Check: should continue?
       }
 
       embedSelectorMap.put(sNum, embeddedSelector);
       ++sNum;
     }
+  }
 
+  private void parallelEncrypt(int numThreads, HashMap<Integer,Integer> 
selectorQueryVecMapping) throws PIRException
+  {
     // Encrypt and form the query vector
     ExecutorService es = Executors.newCachedThreadPool();
+    ArrayList<EncryptQueryRunnable> runnables = new 
ArrayList<EncryptQueryRunnable>(numThreads);
+    int numElements = 1 << queryInfo.getHashBitSize();  // 2^hashBitSize
 
-    int elementsPerThread = (int) (Math.floor(Math.pow(2, hashBitSize) / 
numThreads));
-
-    ArrayList<EncryptQueryRunnable> runnables = new 
ArrayList<EncryptQueryRunnable>();
-
+    // Split the work across the requested number of threads
+    int elementsPerThread = numElements / numThreads;
     for (int i = 0; i < numThreads; ++i)
     {
       // Grab the range of the thread
@@ -205,10 +214,9 @@ public class EncryptQuery
       int stop = start + elementsPerThread - 1;
       if (i == (numThreads - 1))
       {
-        stop = (int) (Math.pow(2, hashBitSize) - 1);
+        stop = numElements - 1;
       }
 
-      // Create the runnable and execute
       // Copy selectorQueryVecMapping (if numThreads > 1) so we don't have to 
synchronize - only has size = selectors.size()
       HashMap<Integer,Integer> selectorQueryVecMappingCopy = null;
       if (numThreads == 1)
@@ -219,14 +227,24 @@ public class EncryptQuery
       {
         selectorQueryVecMappingCopy = new 
HashMap<Integer,Integer>(selectorQueryVecMapping);
       }
-      EncryptQueryRunnable runEnc = new 
EncryptQueryRunnable(dataPartitionBitSize, hashBitSize, paillier.clone(), 
selectorQueryVecMappingCopy, start, stop);
+
+      // Create the runnable and execute
+      EncryptQueryRunnable runEnc = new 
EncryptQueryRunnable(queryInfo.getDataPartitionBitSize(), paillier.clone(), 
selectorQueryVecMappingCopy, start, stop);
       runnables.add(runEnc);
       es.execute(runEnc);
     }
 
     // Allow threads to complete
     es.shutdown(); // previously submitted tasks are executed, but no new 
tasks will be accepted
-    boolean finished = es.awaitTermination(1, TimeUnit.DAYS); // waits until 
all tasks complete or until the specified timeout
+    boolean finished = false;
+    try
+    {
+      // waits until all tasks complete or until the specified timeout
+      finished = es.awaitTermination(1, TimeUnit.DAYS);
+    } catch (InterruptedException e)
+    {
+      Thread.interrupted();
+    }
 
     if (!finished)
     {
@@ -236,30 +254,9 @@ public class EncryptQuery
     // Pull all encrypted elements and add to Query
     for (EncryptQueryRunnable runner : runnables)
     {
-      TreeMap<Integer,BigInteger> encValues = runner.getEncryptedValues();
-      query.addQueryElements(encValues);
+      query.addQueryElements(runner.getEncryptedValues());
     }
     logger.info("Completed creation of encrypted query vectors");
-
-    // Generate the expTable in Query, if we are using it and if
-    // useHDFSExpLookupTable is false -- if we are generating it as standalone 
and not on the cluster
-    if (query.getQueryInfo().getUseExpLookupTable() && 
!query.getQueryInfo().getUseHDFSExpLookupTable())
-    {
-      logger.info("Starting expTable generation");
-
-      // This has to be reasonably multithreaded or it takes forever...
-      if (numThreads < 8)
-      {
-        query.generateExpTable(8);
-      }
-      else
-      {
-        query.generateExpTable(numThreads);
-      }
-    }
-
-    // Set the Querier object
-    querier = new Querier(queryInfo, selectors, paillier, query, 
embedSelectorMap);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/371e5390/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java
 
b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java
index c060197..04fb929 100644
--- 
a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java
+++ 
b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQueryRunnable.java
@@ -36,7 +36,6 @@ public class EncryptQueryRunnable implements Runnable
   private static Logger logger = LogUtils.getLoggerForThisClass();
 
   int dataPartitionBitSize = 0;
-  int hashBitSize = 0;
   int start = 0; // start of computing range for the runnable
   int stop = 0; // stop, inclusive, of the computing range for the runnable
 
@@ -45,11 +44,10 @@ public class EncryptQueryRunnable implements Runnable
 
   TreeMap<Integer,BigInteger> encryptedValues = null; // holds the ordered 
encrypted values to pull after thread computation is complete
 
-  public EncryptQueryRunnable(int dataPartitionBitSizeInput, int 
hashBitSizeInput, Paillier paillierInput,
-      HashMap<Integer,Integer> selectorQueryVecMappingInput, int startInput, 
int stopInput)
+  public EncryptQueryRunnable(int dataPartitionBitSizeInput, Paillier 
paillierInput, HashMap<Integer,Integer> selectorQueryVecMappingInput, int 
startInput,
+      int stopInput)
   {
     dataPartitionBitSize = dataPartitionBitSizeInput;
-    hashBitSize = hashBitSizeInput;
 
     paillier = paillierInput;
     selectorQueryVecMapping = selectorQueryVecMappingInput;
@@ -73,42 +71,20 @@ public class EncryptQueryRunnable implements Runnable
   @Override
   public void run()
   {
-    int i = start;
-    while (i <= stop)
+    for (int i = start; i <= stop; i++)
     {
-      if (selectorQueryVecMapping.containsKey(i))
+      Integer selectorNum = selectorQueryVecMapping.get(i);
+      BigInteger valToEnc = (selectorNum == null) ? BigInteger.ZERO : 
(BigInteger.valueOf(2)).pow(selectorNum * dataPartitionBitSize);
+      BigInteger encVal;
+      try
       {
-        int selectorNum = selectorQueryVecMapping.get(i);
-        BigInteger valToEnc = (BigInteger.valueOf(2)).pow(selectorNum * 
dataPartitionBitSize);
-
-        BigInteger encVal;
-        try
-        {
-          encVal = paillier.encrypt(valToEnc);
-        } catch (PIRException e)
-        {
-          e.printStackTrace();
-          throw new RuntimeException(e.toString());
-        }
-        encryptedValues.put(i, encVal);
-
-        logger.debug("selectorNum = " + selectorNum + " valToEnc = " + 
valToEnc + " envVal = " + encVal);
-      }
-      else
+        encVal = paillier.encrypt(valToEnc);
+      } catch (PIRException e)
       {
-        BigInteger encZero;
-        try
-        {
-          encZero = paillier.encrypt(BigInteger.ZERO);
-        } catch (PIRException e)
-        {
-          e.printStackTrace();
-          throw new RuntimeException(e.toString());
-        }
-        encryptedValues.put(i, encZero);
+        throw new RuntimeException(e);
       }
-      ++i;
+      encryptedValues.put(i, encVal);
+      logger.debug("selectorNum = " + selectorNum + " valToEnc = " + valToEnc 
+ " envVal = " + encVal);
     }
   }
-
 }

Reply via email to