http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java 
b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 4fee85a..f5d24d7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,6 +21,8 @@ package org.apache.pirk.responder.wideskies;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.cli.Option;
 import org.apache.pirk.inputformat.hadoop.InputFormatConst;
 import org.apache.pirk.schema.data.DataSchemaLoader;
@@ -76,10 +78,45 @@ public class ResponderProps
   public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches";
   public static final String STOPGRACEFULLY = 
"spark.streaming.stopGracefullyOnShutdown";
 
-  static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, 
DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT,
-      OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, 
USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, 
REDUCEMEMORY, MAPJAVAOPTS,
+  // Storm parameters
+  // hdfs
+  static final String HDFSURI = "hdfs.uri";
+  static final String USEHDFS = "hdfs.use";
+  // kafka
+  static final String KAFKATOPIC = "kafka.topic";
+  static final String KAFKACLIENTID = "kafka.clientId";
+  static final String KAFKAZK = "kafka.zk";
+  static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
+  // pirk topo
+  static final String STORMTOPONAME = "storm.topoName";
+  static final String STORMWORKERS = "storm.workers";
+  static final String STORMNUMACKERS = "storm.numAckers";
+  static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
+  static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
+  static final String STORMTRANSFERBUFFERS = 
"storm.executor.transferBufferSize";
+  static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
+  static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
+  static final String STORMCHILDOPTS = "storm.worker.childOpts";
+  static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
+  static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
+  static final String STORMSPOUTPAR = "storm.spout.parallelism";
+  static final String STORMPARTITIONDATABOLTPAR = 
"storm.partitiondata.parallelism";
+  static final String STORMENCROWCALCBOLTPAR = 
"storm.encrowcalcbolt.parallelism";
+  static final String STORMENCCOLMULTBOLTPAR = 
"storm.enccolmultbolt.parallelism";
+  static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
+  static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
+  static final String STORMSALTCOLUMNS = "storm.saltColumns";
+  static final String STORMNUMROWDIVS = "storm.rowDivs";
+
+  static final String[] STORMPROPS = new String[]{HDFSURI, USEHDFS, 
KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, 
STORMWORKERS,
+      STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, 
STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, 
STORMMAXWORKERHEAP,
+      STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, 
STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, 
STORMSPLITPARTITIONS,
+      STORMSALTCOLUMNS, STORMNUMROWDIVS};
+
+  static final List<String> PROPSLIST = Arrays.asList((String[]) 
ArrayUtils.addAll(new String[]{PLATFORM, QUERYINPUT, DATAINPUTFORMAT, 
INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE,
+      BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, 
LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
       REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, 
USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
-      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, 
WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY);
+      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, 
WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS));
 
   /**
    * Validates the responder properties
@@ -98,7 +135,7 @@ public class ResponderProps
     }
 
     String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
-    if (!platform.equals("mapreduce") && !platform.equals("spark") && 
!platform.equals("sparkstreaming") && !platform.equals("standalone"))
+    if (!platform.equals("mapreduce") && !platform.equals("spark") && 
!platform.equals("sparkstreaming") && !platform.equals("storm") && 
!platform.equals("standalone"))
     {
       logger.info("Unsupported platform: " + platform);
       valid = false;

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
 
b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 0745bea..0050e29 100644
--- 
a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -44,7 +44,6 @@ import com.google.common.cache.LoadingCache;
 
 /**
  * Class to compute the encrypted row elements for a query from extracted data 
partitions
- * 
  */
 public class ComputeEncryptedRow
 {
@@ -98,7 +97,6 @@ public class ComputeEncryptedRow
    * Optionally uses a static LRU cache for the modular exponentiation
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> 
computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int 
rowIndex,
       boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) 
throws IOException
@@ -112,7 +110,7 @@ public class ComputeEncryptedRow
     int elementCounter = 0;
     for (BytesArrayWritable dataPartitions : dataPartitionsIter)
     {
-      logger.debug("rowIndex = " + rowIndex + " elementCounter = " + 
elementCounter);
+      logger.debug("rowIndex = {} elementCounter = {}", rowIndex, 
elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -121,7 +119,7 @@ public class ComputeEncryptedRow
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " 
rowIndex = " + rowIndex + " colCounter = " + colCounter);
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", 
dataPartitions.size(), rowIndex, colCounter);
 
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
@@ -142,8 +140,8 @@ public class ComputeEncryptedRow
         {
           e.printStackTrace();
         }
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter 
+ " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = 
"
-            + exp + " i = " + i + " partition = " + 
dataPartitions.getBigInteger(i) + " = " + 
dataPartitions.getBigInteger(i).toString(2));
+        logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} 
exp = {} i = {} partition = {} = {}",
+            rowIndex, colCounter, part.toString(), part.toString(2), exp, i, 
dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -162,7 +160,6 @@ public class ComputeEncryptedRow
    * Optionally uses a static LRU cache for the modular exponentiation
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> 
computeEncRowBI(Iterable<List<BigInteger>> dataPartitionsIter, Query query, int 
rowIndex,
       boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) 
throws IOException
@@ -178,7 +175,7 @@ public class ComputeEncryptedRow
     {
       // long startTime = System.currentTimeMillis();
 
-      logger.debug("rowIndex = " + rowIndex + " elementCounter = " + 
elementCounter);
+      logger.debug("rowIndex = {} elementCounter = {}", rowIndex, 
elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -188,8 +185,7 @@ public class ComputeEncryptedRow
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " 
rowIndex = " + rowIndex + " colCounter = " + colCounter);
-
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", 
dataPartitions.size(), rowIndex, colCounter);
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
       {
@@ -209,8 +205,9 @@ public class ComputeEncryptedRow
         {
           e.printStackTrace();
         }
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter 
+ " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = 
"
-            + exp + " i = " + i);
+
+        logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} 
exp = {} i = {}",
+            rowIndex, colCounter, part.toString(), part.toString(2), exp, i);
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -234,7 +231,6 @@ public class ComputeEncryptedRow
    * For each row (as indicated by key = hash(selector)), iterates over the 
dataPartitions and calculates the column values.
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> 
computeEncRowCacheInput(Iterable<List<BigInteger>> dataPartitionsIter, 
HashMap<Integer,BigInteger> cache,
       int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector) 
throws IOException
@@ -245,7 +241,7 @@ public class ComputeEncryptedRow
     int elementCounter = 0;
     for (List<BigInteger> dataPartitions : dataPartitionsIter)
     {
-      logger.debug("elementCounter = " + elementCounter);
+      logger.debug("elementCounter = {}", elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -254,7 +250,7 @@ public class ComputeEncryptedRow
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " 
rowIndex = " + rowIndex + " colCounter = " + colCounter);
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", 
dataPartitions.size(), rowIndex, colCounter);
 
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
@@ -262,7 +258,7 @@ public class ComputeEncryptedRow
         BigInteger part = dataPartitions.get(i);
         BigInteger exp = cache.get(part.intValue());
 
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter 
+ " part = " + part.toString() + " exp = " + exp + " i = " + i);
+        logger.debug("rowIndex = {} colCounter = {} part = {} exp = {} i = 
{}", rowIndex, colCounter, part.toString(), exp, i);
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -283,7 +279,6 @@ public class ComputeEncryptedRow
    * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable 
dataPartitions, Query query, int rowIndex, int colIndex) throws IOException
   {
@@ -295,7 +290,7 @@ public class ComputeEncryptedRow
     // Initialize the column counter
     long colCounter = colIndex;
 
-    logger.debug("dataPartitions.size() = " + dataPartitions.size() + " 
rowIndex = " + rowIndex + " colCounter = " + colCounter);
+    logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", 
dataPartitions.size(), rowIndex, colCounter);
 
     // Update the associated column values
     for (int i = 0; i < dataPartitions.size(); ++i)
@@ -311,8 +306,8 @@ public class ComputeEncryptedRow
         e.printStackTrace();
       }
 
-      logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + 
" part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
-          + exp + " i = " + i + " partition = " + 
dataPartitions.getBigInteger(i) + " = " + 
dataPartitions.getBigInteger(i).toString(2));
+      logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} 
exp = {} i = {} partition = {} = {}",
+          rowIndex, colCounter, part.toString(), part.toString(2), exp, i, 
dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
 
       returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -321,4 +316,81 @@ public class ComputeEncryptedRow
 
     return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from extracted 
data partitions in the form of ArrayList<<BigInteger>>
+   * <p>
+   * For each row (as indicated by key = hash(selector)), iterates over the 
dataPartitions and calculates the column values.
+   * <p>
+   * Uses a static LRU cache for the modular exponentiation
+   * <p>
+   * Caller is responsible for keeping track of the colIndex and the the 
maxHitsPerSelector values
+   * <p>
+   * Emits {@code Tuple2<<colNum, colVal>>}
+   */
+  public static List<Tuple2<Long,BigInteger>> computeEncRow(List<BigInteger> 
dataPartitions, Query query, int rowIndex, int colIndex)
+      throws IOException
+  {
+    List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+    // Pull the corresponding encrypted row query
+    BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+    // Initialize the column counter
+    long colCounter = colIndex;
+
+    logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", 
dataPartitions, rowIndex, colCounter);
+
+    // Update the associated column values
+    for (int i = 0; i < dataPartitions.size(); ++i)
+    {
+      BigInteger part = dataPartitions.get(i);
+
+      BigInteger exp = null;
+      try
+      {
+        exp = expCache.get(new 
Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+      } catch (ExecutionException e)
+      {
+        e.printStackTrace();
+        break;
+      }
+
+      logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} 
exp = {} i = {} partition = {} = {}",
+          rowIndex, colCounter, part.toString(), part.toString(2), exp, i, 
dataPartitions.get(i), dataPartitions.get(i).toString(2));
+
+      returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+      ++colCounter;
+    }
+
+    return returnPairs;
+  }
+
+  public static List<Tuple2<Long,BigInteger>> computeEncRow(BigInteger part, 
Query query, int rowIndex, int colIndex) throws IOException
+  {
+    List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+    // Pull the corresponding encrypted row query
+    BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+    // Initialize the column counter
+    long colCounter = colIndex;
+
+    // Update the associated column values
+    BigInteger exp = null;
+    try
+    {
+      exp = expCache.get(new 
Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+    } catch (ExecutionException e)
+    {
+      e.printStackTrace();
+    }
+
+    returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+    ++colCounter;
+
+    return returnPairs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
 
b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
index 61169f2..e605d94 100644
--- 
a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
@@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
- * Given a MapWritable dataElement, this class gives the common functionality 
to extract the selector by queryType from each dataElement, perform a keyed hash
- * of the selector, extract the partitions of the dataElement, and outputs 
{@code <hash(selector), dataPartitions>}
+ * Given a MapWritable or JSON formatted dataElement, this class gives the 
common functionality to extract the selector by queryType from each dataElement,
+ * perform a keyed hash of the selector, extract the partitions of the 
dataElement, and outputs {@code <hash(selector), dataPartitions>}
  */
 public class HashSelectorAndPartitionData
 {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java 
b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index f34acf8..6014435 100644
--- 
a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -90,8 +90,9 @@ public class ComputeResponse
   private BroadcastVars bVars = null;
 
   private QueryInfo queryInfo = null;
-  Query query = null;
-  QuerySchema qSchema = null;
+
+  private Query query = null;
+  private QuerySchema qSchema = null;
 
   private int numDataPartitions = 0;
   private int numColMultPartitions = 0;

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
new file mode 100644
index 0000000..08c9917
--- /dev/null
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bolt class to perform encrypted column multiplication
+ * <p>
+ * Takes {@code <columnIndex, columnValue>} tuples as input and aggregates 
(multiplies) the columnValues for a given columnIndex as they are received.
+ * <p>
+ * EncRowCalcBolts send flush signals to the EncColMultBolts indicating that 
they have finished sending all tuples for a session. Whenever a flush signal is
+ * received from a EncRowCalcBolt, the num of received flush signals is 
tallied until each EncRowCalcBolt has emitted a flush signal.
+ * <p>
+ * Once a flush signal has been received from each EncRowCalcBolt, all {@code 
<columnIndex, aggregate colVal product>} tuples are sent to the OutputBolt and 
a session_end
+ * signal is sent back to each EncRowMultBolt.
+ * <p>
+ * The EncRowMultBolts buffer their output from the time that they send a 
flush signal to the EncColMultBolts until the time that they receive a 
session_end
+ * signal from all of the EncColMultBolts.
+ * 
+ */
+public class EncColMultBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(EncColMultBolt.class);
+
+  private OutputCollector outputCollector;
+
+  private BigInteger nSquared;
+  private long numFlushSignals;
+  private Long totalFlushSignals;
+
+  // This is the main object here. It holds column Id -> aggregated product
+  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+  private BigInteger colVal1;
+  private BigInteger colMult;
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector collector)
+  {
+    outputCollector = collector;
+    String nSquare = (String) map.get(StormConstants.N_SQUARED_KEY);
+    nSquared = new BigInteger(nSquare);
+    totalFlushSignals = (Long) 
map.get(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY);
+
+    logger.info("Initialized EncColMultBolt. ");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    if 
(tuple.getSourceStreamId().equals(StormConstants.ENCROWCALCBOLT_FLUSH_SIG))
+    {
+      numFlushSignals += 1;
+      logger.debug("Received  {} flush signals out of {}", numFlushSignals, 
totalFlushSignals);
+
+      // Need to receive notice from all EncRowCalcBolts in order to flush.
+      if (numFlushSignals == totalFlushSignals)
+      {
+        logger.debug("Received signal to flush in EncColMultBolt. Outputting 
{} results.", resultsMap.keySet().size());
+        for (Long key : resultsMap.keySet())
+          // key = column Id, value = aggregated product
+          outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new 
Values(key, resultsMap.get(key)));
+        resultsMap.clear();
+
+        // Send signal to OutputBolt to write output and notify EncRowCalcBolt 
that results have been flushed.
+        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new 
Long(-1), BigInteger.valueOf(0)));
+        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new 
Values(1));
+        numFlushSignals = 0;
+      }
+    }
+    else
+    {
+      // Data tuple received. Do column multiplication.
+
+      long colIndex = 
tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD);
+      colVal1 = (BigInteger) 
tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
+
+      logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, 
colIndex);
+
+      if (resultsMap.containsKey(colIndex))
+      {
+        colMult = colVal1.multiply(resultsMap.get(colIndex));
+        resultsMap.put(colIndex, colMult.mod(nSquared));
+      }
+      else
+      {
+        resultsMap.put(colIndex, colVal1);
+      }
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID,
+        new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, 
StormConstants.COLUMN_PRODUCT_FIELD));
+    
outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, 
new Fields("finished"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
new file mode 100644
index 0000000..639a52b
--- /dev/null
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Bolt class to perform the encrypted row calculation
+ * <p>
+ * Receives a {@code <hash(selector), dataPartitions>} tuple as input.
+ * <p>
+ * Encrypts the row data and emits a (column index, encrypted row-value) tuple 
for each encrypted block.
+ * <p>
+ * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush 
its output and resets all counters. At that point, all outgoing (column index,
+ * encrypted row-value) tuples are buffered until a SESSION_END signal is 
received back from each EncColMultBolt.
+ */
+public class EncRowCalcBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(EncRowCalcBolt.class);
+
+  private OutputCollector outputCollector;
+  private static Query query;
+  private static boolean querySet = false;
+
+  private Boolean limitHitsPerSelector;
+  private Long maxHitsPerSelector;
+  private Long totalEndSigs;
+  private int rowDivisions;
+  private Boolean saltColumns;
+  private Boolean splitPartitions;
+
+  private Random rand;
+
+  // These are the main data structures used here.
+  private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>();
+  private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>();
+  private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
+  private List<BigInteger> dataArray = new ArrayList<>();
+
+  private int numEndSigs = 0;
+
+  // These buffered values are used in the case when a session has been 
ejected, but the SESSION_END signal has not been received
+  // yet from the next bolt.
+  private boolean buffering = false;
+  private List<Tuple2<Long,BigInteger>> bufferedValues = new ArrayList<>();
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector coll)
+  {
+    outputCollector = coll;
+    setQuery(map);
+    logger.info("partition databolt hdfs = " + 
map.get(StormConstants.USE_HDFS));
+
+    maxHitsPerSelector = (Long) map.get(StormConstants.MAX_HITS_PER_SEL_KEY);
+    limitHitsPerSelector = (Boolean) 
map.get(StormConstants.LIMIT_HITS_PER_SEL_KEY);
+    totalEndSigs = (Long) 
map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY);
+    splitPartitions = (Boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+    saltColumns = (Boolean) map.get(StormConstants.SALT_COLUMNS_KEY);
+    rowDivisions = ((Long) 
map.get(StormConstants.ROW_DIVISIONS_KEY)).intValue();
+
+    // If splitPartitions==true, the data is incoming partition by partition, 
rather than record by record.
+    // The numRecords below will increment every partition elt exceed the 
maxHitsPerSelector param far too
+    // soon unless the latter is modified.
+    if (splitPartitions)
+      maxHitsPerSelector *= 
query.getQueryInfo().getNumPartitionsPerDataElement();
+
+    rand = new Random();
+
+    logger.info("Initialized EncRowCalcBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    if (tuple.getSourceStreamId().equals(StormConstants.DEFAULT))
+    {
+      matrixElements = processTupleFromPartitionDataBolt(tuple); // tuple: 
<hash,partitions>
+
+      if (buffering)
+      {
+        logger.debug("Buffering tuple.");
+        bufferedValues.addAll(matrixElements);
+      }
+      else
+      {
+        emitTuples(matrixElements);
+      }
+    }
+    else if (StormUtils.isTickTuple(tuple) && !buffering)
+    {
+      logger.debug("Sending flush signal to EncColMultBolt.");
+      outputCollector.emit(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new 
Values(1));
+
+      colIndexByRow.clear();
+      hitsByRow.clear();
+
+      buffering = true;
+    }
+    else if 
(tuple.getSourceStreamId().equals(StormConstants.ENCCOLMULTBOLT_SESSION_END))
+    {
+      numEndSigs += 1;
+      logger.debug("SessionEnd signal {} of {} received", numEndSigs, 
totalEndSigs);
+
+      // Need to receive signal from all EncColMultBolt instances before 
stopping buffering.
+      if (numEndSigs == totalEndSigs)
+      {
+        logger.debug("Buffering completed, emitting {} tuples.", 
bufferedValues.size());
+        emitTuples(bufferedValues);
+        bufferedValues.clear();
+        buffering = false;
+
+        numEndSigs = 0;
+      }
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    
outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+        new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, 
StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
+    
outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new 
Fields(StormConstants.FLUSH));
+  }
+
+  /***
+   * Extracts (hash, data partitions) from tuple. Encrypts the data 
partitions. Returns all of the pairs of (col index, col value). Also advances 
the
+   * colIndexByRow and hitsByRow appropriately.
+   *
+   * @param tuple
+   * @return
+   */
+  private List<Tuple2<Long,BigInteger>> 
processTupleFromPartitionDataBolt(Tuple tuple)
+  {
+    matrixElements.clear();
+    int rowIndex = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+
+    if (!colIndexByRow.containsKey(rowIndex))
+    {
+      colIndexByRow.put(rowIndex, 0);
+      hitsByRow.put(rowIndex, 0);
+    }
+
+    if (splitPartitions)
+    {
+      dataArray.add((BigInteger) 
tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD));
+    }
+    else
+    {
+      dataArray = (ArrayList<BigInteger>) 
tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD);
+    }
+    logger.debug("Retrieving {} elements in EncRowCalcBolt.", 
dataArray.size());
+
+    try
+    {
+      int colIndex = colIndexByRow.get(rowIndex);
+      int numRecords = hitsByRow.get(rowIndex);
+
+      if (limitHitsPerSelector && numRecords < maxHitsPerSelector)
+      {
+        logger.debug("computing matrix elements.");
+        matrixElements = ComputeEncryptedRow.computeEncRow(dataArray, query, 
rowIndex, colIndex);
+        colIndexByRow.put(rowIndex, colIndex + matrixElements.size());
+        hitsByRow.put(rowIndex, numRecords + 1);
+      }
+      else if (limitHitsPerSelector)
+      {
+        logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + 
numRecords);
+      }
+    } catch (IOException e)
+    {
+      logger.warn("Caught IOException while encrypting row. ", e);
+    }
+
+    dataArray.clear();
+    return matrixElements;
+  }
+
+  private void emitTuples(List<Tuple2<Long,BigInteger>> matrixElements)
+  {
+    // saltColumns distributes the column multiplication done in the next bolt 
EncColMultBolt to avoid hotspotting.
+    if (saltColumns)
+    {
+      for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+      {
+        outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new 
Values(sTuple._1(), sTuple._2(), rand.nextInt(rowDivisions)));
+      }
+    }
+    else
+    {
+      for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+      {
+        outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new 
Values(sTuple._1(), sTuple._2(), 0));
+      }
+    }
+  }
+
+  private synchronized static void setQuery(Map map)
+  {
+    if (!querySet)
+    {
+      query = StormUtils.prepareQuery(map);
+      querySet = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
new file mode 100644
index 0000000..68b02f3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Bolt to compute and output the final Response object for a query
+ * <p>
+ * Receives {@code <colIndex, colProduct>} tuples, computes the final column 
product for each colIndex, records the results in the final Response object, and
+ * outputs the final Response object for the query.
+ * <p>
+ * Flush signals are sent to the OuputBolt from the EncColMultBolts via a 
tuple of the form {@code <-1, 0>}. Once a flush signal has been received from 
each
+ * EncColMultBolt (or a timeout is reached), the final column product is 
computed and the final Response is formed and emitted.
+ * <p>
+ * Currently, the Responses are written to HDFS to location specified by the 
outputFile with the timestamp appended.
+ * <p>
+ * TODO: -- Enable other Response output locations
+ * 
+ */
+public class OutputBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(OutputBolt.class);
+
+  private OutputCollector outputCollector;
+  private QueryInfo queryInfo;
+  private Response response;
+  private String outputFile;
+  private boolean hdfs;
+  private String hdfsUri;
+  private Integer flushCounter = 0;
+  private List<Tuple> tuplesToAck = new ArrayList<>();
+  private Integer totalFlushSigs;
+
+  private LocalFileSystemStore localStore;
+  private HadoopFileSystemStore hadoopStore;
+
+  // This latch just serves as a hook for testing.
+  public static CountDownLatch latch = new CountDownLatch(4);
+
+  // This is the main object here. It holds column Id -> product
+  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+
+  private BigInteger colVal;
+  private BigInteger colMult;
+
+  private BigInteger nSquared;
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector collector)
+  {
+    outputCollector = collector;
+
+    totalFlushSigs = ((Long) 
map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY)).intValue();
+    outputFile = (String) map.get(StormConstants.OUTPUT_FILE_KEY);
+
+    hdfs = (boolean) map.get(StormConstants.USE_HDFS);
+
+    if (hdfs)
+    {
+      hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+      try
+      {
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new 
Configuration());
+        hadoopStore = new HadoopFileSystemStore(fs);
+      } catch (IOException e)
+      {
+        logger.error("Failed to initialize Hadoop file system for output.");
+        throw new RuntimeException(e);
+      }
+    }
+    else
+    {
+      localStore = new LocalFileSystemStore();
+    }
+    nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
+    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    response = new Response(queryInfo);
+
+    logger.info("Intitialized OutputBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    long colIndex = 
tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
+    colVal = (BigInteger) 
tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
+
+    // colIndex == -1 is just the signal sent by EncColMultBolt to notify that 
it flushed it's values.
+    // Could have created a new stream for such signals, but that seemed like 
overkill.
+    if (colIndex == -1)
+    {
+      flushCounter++;
+
+      logger.debug("Received " + flushCounter + " output flush signals out of 
" + totalFlushSigs);
+
+      // Wait till all EncColMultBolts have been flushed
+      if (flushCounter == totalFlushSigs)
+      {
+        logger.info("TimeToFlush reached - outputting response to " + 
outputFile + " with columns.size = " + resultsMap.keySet().size());
+        try
+        {
+          String timestamp = (new 
SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString();
+          for (long cv : resultsMap.keySet())
+          {
+            response.addElement((int) cv, resultsMap.get(cv));
+          }
+
+          if (hdfs)
+          {
+            hadoopStore.store(new Path(outputFile + "_" + timestamp), 
response);
+          }
+          else
+          { // In order to accommodate testing, this does not currently 
include timestamp.
+            // Should probably be fixed, but this will not likely be used 
outside of testing.
+            localStore.store(new File(outputFile), response);
+            for (long cv : resultsMap.keySet())
+            {
+              response.addElement((int) cv, resultsMap.get(cv));
+              logger.debug("column = " + cv + ", value = " + 
resultsMap.get(cv).toString());
+            }
+          }
+        } catch (IOException e)
+        {
+          logger.warn("Unable to write output file.");
+        }
+
+        // Reset
+        resultsMap.clear();
+        flushCounter = 0;
+        for (Tuple t : tuplesToAck)
+          outputCollector.ack(t);
+        // Used for integration test
+        latch.countDown();
+      }
+    }
+    else
+    {
+      // Process data values: add them to map. The column multiplication is 
only done in the case where saltColumns==true,
+      // in which case a small number of multiplications still need to be done 
per column.
+      if (resultsMap.containsKey(colIndex))
+      {
+        colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
+        resultsMap.put(colIndex, colMult);
+      }
+      else
+      {
+        resultsMap.put(colIndex, colVal);
+      }
+      logger.debug("column = " + colIndex + ", value = " + 
resultsMap.get(colIndex).toString());
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
new file mode 100644
index 0000000..9d24620
--- /dev/null
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bolt to extract the partitions of the data record and output {@code 
<hash(selector), dataPartitions>}
+ * <p>
+ * Currently receives a {@code <hash(selector), JSON data record>} as input.
+ * <p>
+ *
+ */
+public class PartitionDataBolt extends BaseBasicBolt
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(PartitionDataBolt.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private QueryInfo queryInfo;
+  private String queryType;
+  private QuerySchema qSchema = null;
+
+  private boolean embedSelector;
+
+  private boolean splitPartitions;
+
+  private JSONObject json;
+  private List<BigInteger> partitions;
+
+  @Override
+  public void prepare(Map map, TopologyContext context)
+  {
+    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    queryType = queryInfo.getQueryType();
+    embedSelector = queryInfo.getEmbedSelector();
+    logger.info("partition databolt hdfs = " + 
map.get(StormConstants.USE_HDFS));
+    StormUtils.initializeSchemas(map, "partition");
+    try
+    {
+      if ((boolean) map.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+      {
+        qSchema = queryInfo.getQuerySchema();
+      }
+      if (qSchema == null)
+      {
+        qSchema = QuerySchemaRegistry.get(queryType);
+      }
+    } catch (Exception e)
+    {
+      logger.error("Unable to initialize schemas in PartitionDataBolt. ", e);
+    }
+
+    json = new JSONObject();
+    splitPartitions = (boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+
+    logger.info("Initialized ExtractAndPartitionDataBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector outputCollector)
+  {
+    int hash = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+    json = (JSONObject) tuple.getValueByField(StormConstants.JSON_DATA_FIELD);
+
+    try
+    {
+      partitions = QueryUtils.partitionDataElement(qSchema, json, 
embedSelector);
+
+      logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting 
results - {}", json.toString(), partitions.size());
+
+      // splitPartitions determines whether each partition piece is sent 
individually or the full Array is sent together.
+      // Since processing in the follow-on bolt (EncRowCalcBolt) is 
computationally expensive, current working theory is
+      // that splitting them up allows for better throughput. Though maybe 
with better knowledge/tuning of Storm internals
+      // and paramters (e.g. certain buffer sizes), it may make no difference.
+      if (splitPartitions)
+      {
+        for (BigInteger partition : partitions)
+        {
+          outputCollector.emit(new Values(hash, partition));
+        }
+      }
+      else
+      {
+        outputCollector.emit(new Values(hash, partitions));
+      }
+
+    } catch (Exception e)
+    {
+      logger.warn("Failed to partition data for record -- " + json + "\n", e);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    outputFieldsDeclarer.declare(new Fields(StormConstants.HASH_FIELD, 
StormConstants.PARTIONED_DATA_FIELD));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
new file mode 100644
index 0000000..76bb80c
--- /dev/null
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.utils.KeyedHash;
+
+import org.apache.storm.Config;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Scheme used by spout to retrieve and hash selector from JSON data on Kafka.
+ */
+public class PirkHashScheme extends StringScheme implements Scheme
+{
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(PirkHashScheme.class);
+
+  private QueryInfo queryInfo;
+
+  transient private JSONParser parser;
+  transient private JSONObject json;
+  private boolean initialized = false;
+  private QuerySchema qSchema;
+  private Config conf;
+
+  public PirkHashScheme(Config conf)
+  {
+    this.conf = conf;
+  }
+
+  public List<Object> deserialize(ByteBuffer bytes)
+  {
+    if (!initialized)
+    {
+      parser = new JSONParser();
+      queryInfo = new QueryInfo((Map) conf.get(StormConstants.QUERY_INFO_KEY));
+
+      StormUtils.initializeSchemas(conf, "hashScheme");
+
+      if ((boolean) conf.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+      {
+        qSchema = queryInfo.getQuerySchema();
+      }
+      if (qSchema == null)
+      {
+        qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType());
+      }
+
+      initialized = true;
+    }
+    String str = super.deserializeString(bytes);
+
+    try
+    {
+      json = (JSONObject) parser.parse(str);
+    } catch (ParseException e)
+    {
+      json = null;
+      logger.warn("ParseException parsing " + str, e);
+    }
+    String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json);
+    int hash = KeyedHash.hash(queryInfo.getHashKey(), 
queryInfo.getHashBitSize(), selector);
+
+    return new Values(hash, json);
+  }
+
+  public Fields getOutputFields()
+  {
+    return new Fields(StormConstants.HASH_FIELD, 
StormConstants.JSON_DATA_FIELD);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
new file mode 100644
index 0000000..ddfca8b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.*;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Storm topology class for wideskies Pirk implementation
+ * <p>
+ * 
+ */
+public class PirkTopology
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(PirkTopology.class);
+
+  private static final String kafkaClientId = 
SystemConfiguration.getProperty("kafka.clientId", "KafkaSpout");
+  private static final String brokerZk = 
SystemConfiguration.getProperty("kafka.zk", "localhost:2181");
+  private static final String kafkaTopic = 
SystemConfiguration.getProperty("kafka.topic", "pirkTopic");
+  private static final Boolean forceFromStart = 
Boolean.parseBoolean(SystemConfiguration.getProperty("kafka.forceFromStart", 
"false"));
+
+  private static final Boolean useHdfs = 
Boolean.parseBoolean(SystemConfiguration.getProperty("hdfs.use", "true"));
+  private static final String hdfsUri = 
SystemConfiguration.getProperty("hdfs.uri", "localhost");
+
+  private static final String topologyName = 
SystemConfiguration.getProperty("storm.topoName", "PirkTopology");
+  private static final Integer numWorkers = 
Integer.parseInt(SystemConfiguration.getProperty("storm.workers", "1"));
+
+  private static final Integer spoutParallelism = 
Integer.parseInt(SystemConfiguration.getProperty("storm.spout.parallelism", 
"1"));
+  private static final Integer partitionDataBoltParallelism = 
Integer.parseInt(SystemConfiguration.getProperty("storm.partitiondata.parallelism",
 "1"));
+  private static final Integer encrowcalcboltParallelism = 
Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.parallelism",
 "1"));
+  private static final Integer enccolmultboltParallelism = 
Integer.parseInt(SystemConfiguration.getProperty("storm.enccolmultbolt.parallelism",
 "1"));
+
+  private static final Boolean saltColumns = 
Boolean.parseBoolean(SystemConfiguration.getProperty("storm.saltColumns", 
"false"));
+  private static final Boolean splitPartitions = 
Boolean.parseBoolean(SystemConfiguration.getProperty("storm.splitPartitions", 
"false"));
+
+  private static final String queryFile = 
SystemConfiguration.getProperty("pir.queryInput");
+  private static final String outputPath = 
SystemConfiguration.getProperty("pir.outputFile");
+
+  public static void runPirkTopology() throws Exception
+  {
+    // Set up Kafka parameters
+    logger.info("Configuring Kafka.");
+    String zkRoot = "/" + kafkaTopic + "_pirk_storm";
+    BrokerHosts zkHosts = new ZkHosts(brokerZk);
+    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, kafkaTopic, zkRoot, 
kafkaClientId);
+    kafkaConfig.ignoreZkOffsets = forceFromStart;
+
+    // Create conf
+    logger.info("Retrieving Query and generating Storm conf.");
+    Config conf = createStormConf();
+    Query query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+    conf.put(StormConstants.N_SQUARED_KEY, query.getNSquared().toString());
+    conf.put(StormConstants.QUERY_INFO_KEY, query.getQueryInfo().toMap());
+
+    // Configure this for different types of input data on Kafka.
+    kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+
+    // Create topology
+    StormTopology topology = getPirkTopology(kafkaConfig);
+
+    // Run topology
+    logger.info("Submitting Pirk topology to Storm...");
+    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+
+  } // main
+
+  /***
+   * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt 
-> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
+   *
+   * @param kafkaConfig
+   * @return
+   */
+  public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
+  {
+    // Create spout and bolts
+    KafkaSpout spout = new KafkaSpout(kafkaConfig);
+    PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
+    EncRowCalcBolt ercbolt = new EncRowCalcBolt();
+    EncColMultBolt ecmbolt = new EncColMultBolt();
+    OutputBolt outputBolt = new OutputBolt();
+
+    // Build Storm topology
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);
+
+    builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, 
partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
+        new Fields(StormConstants.HASH_FIELD));
+
+    // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid 
of b2 and b3).
+    BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, 
ercbolt, encrowcalcboltParallelism)
+        .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new 
Fields(StormConstants.HASH_FIELD))
+        .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, 
StormConstants.ENCCOLMULTBOLT_SESSION_END)
+        .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));
+
+    // b2.setMemoryLoad(5000);
+    // b2.setCPULoad(150.0);
+
+    BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, 
ecmbolt, enccolmultboltParallelism)
+        .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, 
StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+            new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, 
StormConstants.SALT))
+        .allGrouping(StormConstants.ENCROWCALCBOLT_ID, 
StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
+    // b3.setMemoryLoad(5000);
+    // b3.setCPULoad(500.0);
+
+    builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 
1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, 
StormConstants.ENCCOLMULTBOLT_ID);
+
+    return builder.createTopology();
+  }
+
+  public static Config createStormConf()
+  {
+
+    Boolean limitHitsPerSelector = 
Boolean.parseBoolean(SystemConfiguration.getProperty("pir.limitHitsPerSelector"));
+    Integer maxHitsPerSelector = 
Integer.parseInt(SystemConfiguration.getProperty("pir.maxHitsPerSelector"));
+    Integer rowDivisions = 
Integer.parseInt(SystemConfiguration.getProperty("storm.rowDivs", "1"));
+
+    Config conf = new Config();
+    
conf.setNumAckers(Integer.parseInt(SystemConfiguration.getProperty("storm.numAckers",
 numWorkers.toString())));
+    
conf.setMaxSpoutPending(Integer.parseInt(SystemConfiguration.getProperty("storm.maxSpoutPending",
 "300")));
+    conf.setNumWorkers(numWorkers);
+    conf.setDebug(false);
+    // conf.setNumEventLoggers(2);
+
+    conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 
Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize",
 "1024")));
+    conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 
Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize",
 "1024")));
+    conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, 
Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", 
"32")));
+    conf.put(conf.WORKER_HEAP_MEMORY_MB, 
Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", 
"750")));
+    conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", 
"128")));
+
+    // Pirk parameters to send to bolts
+    conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, 
SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", 
"false").equals("true"));
+    conf.put(StormConstants.QSCHEMA_KEY, 
SystemConfiguration.getProperty("query.schemas"));
+    conf.put(StormConstants.DSCHEMA_KEY, 
SystemConfiguration.getProperty("data.schemas"));
+    conf.put(StormConstants.HDFS_URI_KEY, hdfsUri);
+    conf.put(StormConstants.QUERY_FILE_KEY, queryFile);
+    conf.put(StormConstants.USE_HDFS, useHdfs);
+    conf.put(StormConstants.OUTPUT_FILE_KEY, outputPath);
+    conf.put(StormConstants.LIMIT_HITS_PER_SEL_KEY, limitHitsPerSelector);
+    conf.put(StormConstants.MAX_HITS_PER_SEL_KEY, maxHitsPerSelector);
+    conf.put(StormConstants.SPLIT_PARTITIONS_KEY, splitPartitions);
+    conf.put(StormConstants.SALT_COLUMNS_KEY, saltColumns);
+    conf.put(StormConstants.ROW_DIVISIONS_KEY, rowDivisions);
+    conf.put(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY, 
encrowcalcboltParallelism);
+    conf.put(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY, 
enccolmultboltParallelism);
+
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
new file mode 100644
index 0000000..7f1e59d
--- /dev/null
+++ 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+public class StormConstants
+{
+  // Topology Components
+  public static final String SPOUT_ID = "kafkaspout";
+  public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
+  public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
+  public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
+  public static final String OUTPUTBOLT_ID = "outputbolt";
+
+  // Extra Streams
+  public static final String DEFAULT = "default";
+  public static final String ENCROWCALCBOLT_DATASTREAM_ID = 
"encrowcalcbolt_datastream_id";
+  public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
+  public static final String ENCCOLMULTBOLT_SESSION_END = 
"enccolmultbolt_sess_end";
+
+  // Tuple Fields
+  // From HashBolt (and variants)
+  public static final String HASH_FIELD = "hash";
+  public static final String PARTIONED_DATA_FIELD = "parData";
+  public static final String JSON_DATA_FIELD = "data";
+  // From EncRowCalcBolt
+  public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
+  public static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
+  // From EncColMultBolt
+  public static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
+  public static final String COLUMN_PRODUCT_FIELD = "colProduct";
+
+  // Configuration Keys
+  public static final String USE_HDFS = "useHdfs";
+  public static final String HDFS_URI_KEY = "hdfsUri";
+  public static final String QUERY_FILE_KEY = "queryFile";
+  public static final String QUERY_INFO_KEY = "queryInfo";
+  public static final String ALLOW_ADHOC_QSCHEMAS_KEY = 
"allowAdHocQuerySchemas";
+  public static final String QSCHEMA_KEY = "qSchema";
+  public static final String DSCHEMA_KEY = "dschema";
+  public static final String OUTPUT_FILE_KEY = "output";
+  public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
+  public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
+  public static final String SALT_COLUMNS_KEY = "saltColumns";
+  public static final String ROW_DIVISIONS_KEY = "rowDivisions";
+  public static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
+  public static final String N_SQUARED_KEY = "nSquared";
+  public static final String ENCROWCALCBOLT_PARALLELISM_KEY = 
"encrowcalcboltPar";
+  public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = 
"enccolmultboltPar";
+
+  public static final String SALT = "salt";
+  public static final String FLUSH = "flush";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java 
b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
new file mode 100644
index 0000000..7fbca66
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Utils class for the Storm implementation of Wideskies
+ */
+public class StormUtils
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(StormUtils.class);
+
+  /**
+   * Method to read in serialized Query object from the given queryFile
+   * 
+   * @param useHdfs
+   * @param hdfsUri
+   * @param queryFile
+   * @return
+   */
+  public static Query getQuery(boolean useHdfs, String hdfsUri, String 
queryFile)
+  {
+    Query query = null;
+
+    try
+    {
+      if (useHdfs)
+      {
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new 
Configuration());
+        logger.info("reading query file from hdfs: " + queryFile);
+        query = (new HadoopFileSystemStore(fs)).recall(queryFile, Query.class);
+      }
+      else
+      {
+        logger.info("reading local query file from " + queryFile);
+        query = (new LocalFileSystemStore()).recall(queryFile, Query.class);
+      }
+    } catch (Exception e)
+    {
+      logger.error("Unable to initalize query info.", e);
+      throw new RuntimeException(e);
+    }
+    return query;
+  }
+
+  /**
+   * Method to read in and return a serialized Query object from the given 
file and initialize/load the query.schemas and data.schemas
+   * 
+   * @param map
+   * @return
+   */
+  public static Query prepareQuery(Map map)
+  {
+    Query query = null;
+
+    boolean useHdfs = (boolean) map.get(StormConstants.USE_HDFS);
+    String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+    String queryFile = (String) map.get(StormConstants.QUERY_FILE_KEY);
+    try
+    {
+      query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+
+    } catch (Exception e)
+    {
+      logger.warn("Unable to initialize query info.", e);
+    }
+
+    return query;
+  }
+
+  /***
+   * Initialize data and query schema. Conf requires values for USE_HDFS, 
HDFS_URI_KEY, DSCHEMA_KEY, and QSCHEMA_KEY
+   * 
+   * @param conf
+   */
+  public static void initializeSchemas(Map conf, String id)
+  {
+    SystemConfiguration.setProperty("data.schemas", (String) 
conf.get(StormConstants.DSCHEMA_KEY));
+    SystemConfiguration.setProperty("query.schemas", (String) 
conf.get(StormConstants.QSCHEMA_KEY));
+
+    try
+    {
+      boolean hdfs = (boolean) conf.get(StormConstants.USE_HDFS);
+      if (hdfs)
+      {
+        String hdfsUri = (String) conf.get(StormConstants.HDFS_URI_KEY);
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new 
Configuration());
+        DataSchemaLoader.initialize(true, fs);
+        QuerySchemaLoader.initialize(true, fs);
+      }
+      else
+      {
+        DataSchemaLoader.initialize();
+        QuerySchemaLoader.initialize();
+      }
+    } catch (Exception e)
+    {
+      logger.error("Failed to initialize schema files.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static boolean isTickTuple(Tuple tuple)
+  {
+    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && 
tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java 
b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index 28de96e..c651eaa 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -31,7 +31,9 @@ import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
 import org.apache.pirk.utils.PIRException;
@@ -131,8 +133,8 @@ public class DataSchemaLoader
       InputStream is = null;
       if (hdfs)
       {
-        is = fs.open(new Path(schemaFile));
         logger.info("hdfs: filePath = " + schemaFile);
+        is = fs.open(fs.makeQualified(new Path(schemaFile)));
       }
       else
       {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/test/utils/BaseTests.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java 
b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index 962e467..b0c9326 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -77,8 +77,6 @@ public class BaseTests
   {
     logger.info("Running testDNSHostnameQuery(): ");
 
-    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
-
     int numExpectedResults = 6;
     List<QueryResponseJSON> results;
     if (isDistributed)
@@ -93,6 +91,14 @@ public class BaseTests
         numExpectedResults = 7; // all 7 for non distributed case; if 
testFalsePositive==true, then 6
       }
     }
+    checkDNSHostnameQueryResults(results, isDistributed, numExpectedResults, 
testFalsePositive, dataElements);
+    logger.info("Completed testDNSHostnameQuery(): ");
+  }
+
+  public static void checkDNSHostnameQueryResults(List<QueryResponseJSON> 
results, boolean isDistributed, int numExpectedResults,
+      boolean testFalsePositive, List<JSONObject> dataElements)
+  {
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
     logger.info("results:");
     printResultList(results);
 
@@ -188,7 +194,6 @@ public class BaseTests
         }
       }
     }
-    logger.info("Completed testDNSHostnameQuery(): ");
   }
 
   public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int 
numThreads) throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index 8f82f1c..7501aaa 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -46,4 +46,4 @@
                </Root>
        </Loggers>
 
-</Configuration>
\ No newline at end of file
+</Configuration>

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/pirk.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/pirk.properties 
b/src/main/resources/pirk.properties
index 963fa34..a88c846 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -228,9 +228,3 @@ pir.expDir = none
                
 #Parallelism for expLookupTable creation in hdfs 
 pir.expCreationSplits = 600
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/responder.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/responder.properties 
b/src/main/resources/responder.properties
index 3ae92c7..ac6cb35 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -154,4 +154,47 @@ pir.queryInput=
 #default is false
 #spark.streaming.stopGracefullyOnShutdown=
 
- 
\ No newline at end of file
+ ##Properties for Kafka
+ #kafka.topic = topicName
+ #kafka.clientId = pirk_spout
+
+ # Kafka Zookeepers
+ #kafka.zk = localhost:2181
+ # Read from beginning of Kafka topic on startup
+ #kafka.forceFromStart = false
+
+
+ ##Properties for Storm
+ #storm.topoName = pir
+ #storm.workers = 1
+ #storm.numAckers = 1
+ #storm.maxSpoutPending=10
+ #storm.worker.heapMemory=6000
+ #storm.componentOnheapMem= 600.0
+
+ # This should be set to the number of Kafka partitions
+ #storm.spout.parallelism = 1
+
+ #storm.hashbolt.parallelism = 1
+ #storm.encrowcalcbolt.parallelism = 1
+ # This bolt is most computationally expensive and should have the highest 
value
+ #storm.enccolmultbolt.parallelism = 2
+
+ # These may be useful for tuning
+ #storm.executor.receiveBufferSize = 1024
+ #storm.executor.sendBufferSize = 1024
+ #storm.transferBufferSize = 8
+
+ # Frequency with which PIR matrix elements are flushed out
+ #storm.encrowcalcbolt.ticktuple = 60
+
+ # Design configurations:
+ # Hashbolt emits individual tuples for each data partition when 
splitPartitions =true
+ # emits the batch of data partitions for a record in a single tuple when 
=false
+ #storm.splitPartitions = true
+ # A task running EncColMultBolt will only be responsible for multiplying a 
subset of the row
+ # for any individual column when saltColumns = true
+ # All multiplication for a single column is done on a single EncColMultBolt 
instance when = false
+ #storm.saltColumns = true
+ # Only makes sense to tune if saltColumns=true
+ #storm.rowDivs = 1

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java 
b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
new file mode 100644
index 0000000..906c337
--- /dev/null
+++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.storm;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.test.utils.BaseTests;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.QueryResultsWriter;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.TestJob;
+import org.json.simple.JSONObject;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.HashMap;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+@Category(IntegrationTest.class)
+public class KafkaStormIntegrationTest
+{
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaStormIntegrationTest.class);
+
+  private static final LocalFileSystemStore localStore = new 
LocalFileSystemStore();
+
+  private static TestingServer zookeeperLocalCluster;
+  private static KafkaServer kafkaLocalBroker;
+  private static ZkClient zkClient;
+
+  private static final String topic = "pirk_test_topic";
+  private static final String kafkaTmpDir = "/tmp/kafka";
+
+  private static File fileQuery;
+  private static File fileQuerier;
+
+  private QueryInfo queryInfo;
+  private BigInteger nSquared;
+
+  private static int testCountDown = 4;
+
+  @Test
+  public void testKafkaStormIntegration() throws Exception
+  {
+    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
+    SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10");
+    SystemConfiguration.setProperty("storm.spout.parallelism", "1");
+    SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1");
+    SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2");
+    SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2");
+    SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "8");
+    SystemConfiguration.setProperty("storm.rowDivs", "2");
+    SystemConfiguration.setProperty("hdfs.use", "false");
+
+    startZookeeper();
+    startKafka();
+
+    SystemConfiguration.setProperty("kafka.topic", topic);
+    SystemConfiguration.setProperty("storm.topoName", "pirTest");
+
+    // Create encrypted file
+    SystemConfiguration.setProperty("pir.stopListFile", "none");
+    Inputs.createSchemaFiles(StopListFilter.class.getName());
+
+    // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier
+    performEncryption();
+    SystemConfiguration.setProperty("pir.queryInput", 
fileQuery.getAbsolutePath());
+
+    KafkaProducer producer = new 
KafkaProducer<String,String>(createKafkaProducerConfig());
+    loadTestData(producer);
+
+
+    logger.info("Test (splitPartitions,saltColumns) = (true,true)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "true");
+    SystemConfiguration.setProperty("storm.saltColumns", "true");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (true,false)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "true");
+    SystemConfiguration.setProperty("storm.saltColumns", "false");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (false,true)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "false");
+    SystemConfiguration.setProperty("storm.saltColumns", "true");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (false,false)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "false");
+    SystemConfiguration.setProperty("storm.saltColumns", "false");
+    runTest();
+  }
+
+  private void runTest() throws Exception
+  {
+    File responderFile = File.createTempFile("responderFile", ".txt");
+    logger.info("Starting topology.");
+    runTopology(responderFile);
+
+    // decrypt results
+    logger.info("Decrypting results. " + responderFile.length());
+    File fileFinalResults = performDecryption(responderFile);
+
+    // check results
+    List<QueryResponseJSON> results = 
TestUtils.readResultsFile(fileFinalResults);
+    BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, 
Inputs.createJSONDataElements());
+
+    responderFile.deleteOnExit();
+    fileFinalResults.deleteOnExit();
+  }
+
+  private void runTopology(File responderFile) throws Exception
+  {
+    MkClusterParam mkClusterParam = new MkClusterParam();
+    // The test sometimes fails because of timing issues when more than 1 
supervisor set.
+    mkClusterParam.setSupervisors(1);
+
+    // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid 
worrying about timing.
+    Config conf = PirkTopology.createStormConf();
+    conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath());
+    conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString());
+    conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap());
+    // conf.setDebug(true);
+    mkClusterParam.setDaemonConf(conf);
+
+    TestJob testJob = createPirkTestJob(conf);
+    Testing.withLocalCluster(mkClusterParam, testJob);
+    // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob);
+  }
+
+  private TestJob createPirkTestJob(final Config config)
+  {
+    final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config);
+    return new TestJob()
+    {
+      StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig);
+
+      @Override
+      public void run(ILocalCluster iLocalCluster) throws Exception
+      {
+        iLocalCluster.submitTopology("pirk_integration_test", config, 
topology);
+        logger.info("Pausing for setup.");
+        //Thread.sleep(4000);
+        //KafkaProducer producer = new 
KafkaProducer<String,String>(createKafkaProducerConfig());
+        //loadTestData(producer);
+        //Thread.sleep(10000);
+        while(OutputBolt.latch.getCount() == testCountDown){
+          Thread.sleep(1000);
+        }
+        testCountDown -=1;
+
+        logger.info("Finished...");
+      }
+    };
+  }
+
+  private SpoutConfig setUpTestKafkaSpout(Config conf)
+  {
+    ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString());
+
+    SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, 
"/pirk_test_root", "pirk_integr_test_spout");
+    kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+    logger.info("KafkaConfig initialized...");
+
+    return kafkaConfig;
+  }
+
+  private void startZookeeper() throws Exception
+  {
+    logger.info("Starting zookeeper.");
+    zookeeperLocalCluster = new TestingServer();
+    zookeeperLocalCluster.start();
+    logger.info("Zookeeper initialized.");
+
+  }
+
+  private void startKafka() throws Exception
+  {
+    FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+    Properties props = new Properties();
+    props.setProperty("zookeeper.session.timeout.ms", "100000");
+    props.put("advertised.host.name", "localhost");
+    props.put("port", 11111);
+    // props.put("broker.id", "0");
+    props.put("log.dir", kafkaTmpDir);
+    props.put("enable.zookeeper", "true");
+    props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString());
+    KafkaConfig kafkaConfig = KafkaConfig.fromProps(props);
+    kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), 
scala.Option.apply("kafkaThread"));
+    kafkaLocalBroker.startup();
+
+    zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 
60000, ZKStringSerializer$.MODULE$);
+    ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperLocalCluster.getConnectString()), false);
+    //ZkUtils zkUtils = 
ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false);
+    AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception
+  {
+    zkClient.close();
+    kafkaLocalBroker.shutdown();
+    zookeeperLocalCluster.stop();
+
+    FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+    fileQuery.delete();
+    fileQuerier.delete();
+
+  }
+
+  private HashMap<String,Object> createKafkaProducerConfig()
+  {
+    String kafkaHostName = "localhost";
+    Integer kafkaPorts = 11111;
+    HashMap<String,Object> config = new HashMap<String,Object>();
+    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + 
kafkaPorts);
+    config.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    config.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+    return config;
+  }
+
+  private void loadTestData(KafkaProducer producer)
+  {
+    for (JSONObject dataRecord : Inputs.createJSONDataElements())
+    {
+      logger.info("Sending record to Kafka " + dataRecord.toString());
+      producer.send(new ProducerRecord<String,String>(topic, 
dataRecord.toString()));
+    }
+  }
+
+  private void performEncryption() throws Exception
+  {
+    // ArrayList<String> selectors = BaseTests.selectorsDomain;
+    List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", 
"d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
+    String queryType = Inputs.DNS_HOSTNAME_QUERY;
+
+    Paillier paillier = new Paillier(BaseTests.paillierBitSize, 
BaseTests.certainty);
+
+    nSquared = paillier.getNSquared();
+
+    queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), 
BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, 
queryType,
+        false, true, false);
+
+    // Perform the encryption
+    logger.info("Performing encryption of the selectors - forming encrypted 
query vectors:");
+    EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, 
paillier);
+    Querier querier = encryptQuery.encrypt(1);
+    logger.info("Completed encryption of the selectors - completed formation 
of the encrypted query vectors:");
+
+    // Write out files.
+    fileQuerier = File.createTempFile("pir_integrationTest-" + 
QuerierConst.QUERIER_FILETAG, ".txt");
+    fileQuery = File.createTempFile("pir_integrationTest-" + 
QuerierConst.QUERY_FILETAG, ".txt");
+
+    localStore.store(fileQuerier.getAbsolutePath(), querier);
+    localStore.store(fileQuery, querier.getQuery());
+  }
+
+  private File performDecryption(File responseFile) throws Exception
+  {
+    File finalResults = File.createTempFile("finalFileResults", ".txt");
+    String querierFilePath = fileQuerier.getAbsolutePath();
+    String responseFilePath = responseFile.getAbsolutePath();
+    String outputFile = finalResults.getAbsolutePath();
+    int numThreads = 1;
+
+    Response response = localStore.recall(responseFilePath, Response.class);
+    Querier querier = localStore.recall(querierFilePath, Querier.class);
+
+    // Perform decryption and output the result file
+    DecryptResponse decryptResponse = new DecryptResponse(response, querier);
+    decryptResponse.decrypt(numThreads);
+    QueryResultsWriter.writeResultFile(outputFile, 
decryptResponse.decrypt(numThreads));
+    return finalResults;
+  }
+
+}


Reply via email to