Repository: incubator-pirk Updated Branches: refs/heads/master acc0c314b -> d992fa293
Enhancements to partitioners, keyedHash, and prime generator, this closes apache/incubator-pirk#64 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/d992fa29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/d992fa29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/d992fa29 Branch: refs/heads/master Commit: d992fa293d00b300681108980b87d51332c32d85 Parents: acc0c31 Author: Tim Ellison <[email protected]> Authored: Tue Aug 16 08:36:52 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Aug 16 08:36:52 2016 -0400 ---------------------------------------------------------------------- .../apache/pirk/encryption/PrimeGenerator.java | 14 ++- .../inputformat/hadoop/BytesArrayWritable.java | 14 +-- .../apache/pirk/querier/wideskies/Querier.java | 7 +- .../wideskies/decrypt/DecryptResponse.java | 3 +- .../querier/wideskies/encrypt/EncryptQuery.java | 7 +- .../apache/pirk/query/wideskies/QueryUtils.java | 37 +++--- .../wideskies/common/ComputeEncryptedRow.java | 45 ++++---- .../common/HashSelectorAndPartitionData.java | 30 ++--- .../wideskies/mapreduce/RowCalcReducer.java | 11 +- .../wideskies/spark/ComputeResponse.java | 7 +- .../responder/wideskies/spark/EncRowCalc.java | 9 +- .../spark/EncRowCalcPrecomputedCache.java | 11 +- .../spark/HashSelectorsAndPartitionData.java | 11 +- .../wideskies/standalone/Responder.java | 21 ++-- .../partitioner/PrimitiveTypePartitioner.java | 28 +++-- .../test/distributed/DistributedTestDriver.java | 9 +- .../distributed/testsuite/DistTestSuite.java | 8 +- .../org/apache/pirk/test/utils/BaseTests.java | 30 ++--- .../java/org/apache/pirk/test/utils/Inputs.java | 5 +- .../apache/pirk/test/utils/StandaloneQuery.java | 17 ++- .../java/org/apache/pirk/utils/KeyedHash.java | 13 +-- .../apache/pirk/general/PartitionUtilsTest.java | 114 ++++++++----------- 22 files changed, 214 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/encryption/PrimeGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/encryption/PrimeGenerator.java b/src/main/java/org/apache/pirk/encryption/PrimeGenerator.java index ddfbd00..d45cf56 100644 --- a/src/main/java/org/apache/pirk/encryption/PrimeGenerator.java +++ b/src/main/java/org/apache/pirk/encryption/PrimeGenerator.java @@ -18,15 +18,15 @@ */ package org.apache.pirk.encryption; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigDecimal; import java.math.BigInteger; import java.util.HashMap; import java.util.Random; -import org.apache.pirk.utils.SystemConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Class to generate the primes used in the Paillier cryptosystem * <p> @@ -55,6 +55,8 @@ public class PrimeGenerator { private static final Logger logger = LoggerFactory.getLogger(PrimeGenerator.class); + private static final BigDecimal SQRT_2 = BigDecimal.valueOf(Math.sqrt(2)); + private static final HashMap<Integer,BigInteger> lowerBoundCache = new HashMap<>(); private static final HashMap<Integer,BigInteger> minimumDifferenceCache = new HashMap<>(); @@ -88,7 +90,7 @@ public class PrimeGenerator BigInteger lowerBound; if (!lowerBoundCache.containsKey(bitLength)) { - lowerBound = BigDecimal.valueOf(Math.sqrt(2)).multiply(BigDecimal.valueOf(2).pow((bitLength / 2) - 1)).toBigInteger(); + lowerBound = SQRT_2.multiply(BigDecimal.valueOf(2).pow((bitLength / 2) - 1)).toBigInteger(); lowerBoundCache.put(bitLength, lowerBound); } else @@ -149,7 +151,7 @@ public class PrimeGenerator BigInteger lowerBound; if (!lowerBoundCache.containsKey(bitLength)) { - lowerBound = BigDecimal.valueOf(Math.sqrt(2)).multiply(BigDecimal.valueOf(2).pow((bitLength / 2) - 1)).toBigInteger(); + lowerBound = SQRT_2.multiply(BigDecimal.valueOf(2).pow((bitLength / 2) - 1)).toBigInteger(); lowerBoundCache.put(bitLength, lowerBound); } else http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/inputformat/hadoop/BytesArrayWritable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/inputformat/hadoop/BytesArrayWritable.java b/src/main/java/org/apache/pirk/inputformat/hadoop/BytesArrayWritable.java index 48078df..b8db7fd 100644 --- a/src/main/java/org/apache/pirk/inputformat/hadoop/BytesArrayWritable.java +++ b/src/main/java/org/apache/pirk/inputformat/hadoop/BytesArrayWritable.java @@ -18,17 +18,17 @@ */ package org.apache.pirk.inputformat.hadoop; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; - import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.BytesWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; + /** * ArrayWritable class with ByteWritable entries * @@ -60,7 +60,7 @@ public class BytesArrayWritable extends ArrayWritable /** * Constructor for use when underlying array will be ByteWritable representations of BigInteger objects */ - public BytesArrayWritable(ArrayList<BigInteger> elements) + public BytesArrayWritable(List<BigInteger> elements) { super(BytesWritable.class); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/querier/wideskies/Querier.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java index 2beed14..48be51d 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java @@ -21,6 +21,7 @@ package org.apache.pirk.querier.wideskies; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import org.apache.pirk.encryption.Paillier; import org.apache.pirk.query.wideskies.Query; @@ -37,14 +38,14 @@ public class Querier implements Serializable, Storable private Paillier paillier = null; // Paillier encryption functionality - private ArrayList<String> selectors = null; // selectors + private List<String> selectors = null; // selectors // map to check the embedded selectors in the results for false positives; // if the selector is a fixed size < 32 bits, it is included as is // if the selector is of variable lengths private HashMap<Integer,String> embedSelectorMap = null; - public Querier(ArrayList<String> selectorsInput, Paillier paillierInput, Query queryInput, HashMap<Integer,String> embedSelectorMapInput) + public Querier(List<String> selectorsInput, Paillier paillierInput, Query queryInput, HashMap<Integer,String> embedSelectorMapInput) { selectors = selectorsInput; @@ -65,7 +66,7 @@ public class Querier implements Serializable, Storable return paillier; } - public ArrayList<String> getSelectors() + public List<String> getSelectors() { return selectors; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java index 1ce62ec..928c852 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/decrypt/DecryptResponse.java @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; import java.util.concurrent.ExecutorService; @@ -87,7 +88,7 @@ public class DecryptResponse QueryInfo queryInfo = response.getQueryInfo(); Paillier paillier = querier.getPaillier(); - ArrayList<String> selectors = querier.getSelectors(); + List<String> selectors = querier.getSelectors(); HashMap<Integer,String> embedSelectorMap = querier.getEmbedSelectorMap(); // Perform decryption on the encrypted columns http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index e02dacc..49958e4 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -21,6 +21,7 @@ package org.apache.pirk.querier.wideskies.encrypt; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -56,14 +57,14 @@ public class EncryptQuery private Paillier paillier = null; // Paillier encryption functionality - private ArrayList<String> selectors = null; // selectors for the query + private List<String> selectors = null; // selectors for the query // Map to check the embedded selectors in the results for false positives; // if the selector is a fixed size < 32 bits, it is included as is // if the selector is of variable lengths private HashMap<Integer,String> embedSelectorMap = null; - public EncryptQuery(QueryInfo queryInfoInput, ArrayList<String> selectorsInput, Paillier paillierInput) + public EncryptQuery(QueryInfo queryInfoInput, List<String> selectorsInput, Paillier paillierInput) { queryInfo = queryInfoInput; @@ -94,7 +95,7 @@ public class EncryptQuery return querier; } - public ArrayList<String> getSelectors() + public List<String> getSelectors() { return selectors; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java b/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java index 151d012..9e430d8 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java +++ b/src/main/java/org/apache/pirk/query/wideskies/QueryUtils.java @@ -18,12 +18,6 @@ */ package org.apache.pirk.query.wideskies; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -34,6 +28,7 @@ import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.response.QueryResponseJSON; import org.apache.pirk.utils.KeyedHash; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.StringUtils; import org.apache.pirk.utils.SystemConfiguration; import org.elasticsearch.hadoop.mr.WritableArrayWritable; @@ -41,6 +36,12 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Class for helper methods to perform the encrypted query */ @@ -51,7 +52,7 @@ public class QueryUtils /** * Method to convert the given BigInteger raw data element partitions to a QueryResponseJSON object based upon the given queryType */ - public static QueryResponseJSON extractQueryResponseJSON(QueryInfo queryInfo, QuerySchema qSchema, ArrayList<BigInteger> parts) throws Exception + public static QueryResponseJSON extractQueryResponseJSON(QueryInfo queryInfo, QuerySchema qSchema, List<BigInteger> parts) throws PIRException { QueryResponseJSON qrJSON = new QueryResponseJSON(queryInfo); @@ -103,9 +104,9 @@ public class QueryUtils /** * Method to convert the given data element given by the JSONObject data element into the extracted BigInteger partitions based upon the given queryType */ - public static ArrayList<BigInteger> partitionDataElement(QuerySchema qSchema, JSONObject jsonData, boolean embedSelector) throws Exception + public static List<BigInteger> partitionDataElement(QuerySchema qSchema, JSONObject jsonData, boolean embedSelector) throws PIRException { - ArrayList<BigInteger> parts = new ArrayList<>(); + List<BigInteger> parts = new ArrayList<>(); DataSchema dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName()); // Add the embedded selector to the parts @@ -164,10 +165,10 @@ public class QueryUtils /** * Method to convert the given data element given by the MapWritable data element into the extracted BigInteger partitions based upon the given queryType */ - public static ArrayList<BigInteger> partitionDataElement(MapWritable dataMap, QuerySchema qSchema, DataSchema dSchema, boolean embedSelector) - throws Exception + public static List<BigInteger> partitionDataElement(MapWritable dataMap, QuerySchema qSchema, DataSchema dSchema, boolean embedSelector) + throws PIRException { - ArrayList<BigInteger> parts = new ArrayList<>(); + List<BigInteger> parts = new ArrayList<>(); logger.debug("queryType = " + qSchema.getSchemaName()); @@ -232,7 +233,7 @@ public class QueryUtils /** * Method to convert the given selector into the extracted BigInteger partitions */ - public static List<BigInteger> embeddedSelectorToPartitions(String selector, String type, DataPartitioner partitioner) throws Exception + public static List<BigInteger> embeddedSelectorToPartitions(String selector, String type, DataPartitioner partitioner) throws PIRException { List<BigInteger> parts; @@ -255,7 +256,7 @@ public class QueryUtils * Method get the embedded selector from a given selector * */ - public static String getEmbeddedSelector(String selector, String type, DataPartitioner partitioner) throws Exception + public static String getEmbeddedSelector(String selector, String type, DataPartitioner partitioner) throws PIRException { String embeddedSelector; @@ -276,7 +277,7 @@ public class QueryUtils /** * Reconstructs the String version of the embedded selector from its partitions */ - public static String getEmbeddedSelectorFromPartitions(ArrayList<BigInteger> parts, int partsIndex, String type, Object partitioner) throws Exception + public static String getEmbeddedSelectorFromPartitions(List<BigInteger> parts, int partsIndex, String type, Object partitioner) throws PIRException { String embeddedSelector; @@ -339,7 +340,7 @@ public class QueryUtils if (dSchema.isArrayElement(fieldName)) { - ArrayList<String> elementArray = StringUtils.jsonArrayStringToArrayList(dataMap.get(fieldName).toString()); + List<String> elementArray = StringUtils.jsonArrayStringToArrayList(dataMap.get(fieldName).toString()); selector = elementArray.get(0); } else @@ -350,12 +351,12 @@ public class QueryUtils } // For debug - private static void printParts(ArrayList<BigInteger> parts) + private static void printParts(List<BigInteger> parts) { int i = 0; for (BigInteger part : parts) { - logger.debug("parts(" + i + ") = " + parts.get(i).intValue() + " parts bits = " + parts.get(i).toString(2)); + logger.debug("parts(" + i + ") = " + part.intValue() + " parts bits = " + part.toString(2)); ++i; } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java index 5eed275..2e58c0f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java @@ -18,14 +18,9 @@ */ package org.apache.pirk.responder.wideskies.common; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.ExecutionException; - +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pirk.encryption.ModPowAbstraction; @@ -33,13 +28,17 @@ import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; import org.apache.pirk.query.wideskies.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple2; import scala.Tuple3; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutionException; /** * Class to compute the encrypted row elements for a query from extracted data partitions @@ -99,10 +98,10 @@ public class ComputeEncryptedRow * Emits {@code Tuple2<<colNum, colVal>>} * */ - public static ArrayList<Tuple2<Long,BigInteger>> computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int rowIndex, + public static List<Tuple2<Long,BigInteger>> computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); // Pull the corresponding encrypted row query BigInteger rowQuery = query.getQueryElement(rowIndex); @@ -154,7 +153,7 @@ public class ComputeEncryptedRow } /** - * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link ArrayList<BigInteger> * * * * } + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of Iterable{@link List<BigInteger> * * * * } * <p> * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. * <p> @@ -163,17 +162,17 @@ public class ComputeEncryptedRow * Emits {@code Tuple2<<colNum, colVal>>} * */ - public static ArrayList<Tuple2<Long,BigInteger>> computeEncRowBI(Iterable<ArrayList<BigInteger>> dataPartitionsIter, Query query, int rowIndex, + public static List<Tuple2<Long,BigInteger>> computeEncRowBI(Iterable<List<BigInteger>> dataPartitionsIter, Query query, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); // Pull the corresponding encrypted row query BigInteger rowQuery = query.getQueryElement(rowIndex); long colCounter = 0; int elementCounter = 0; - for (ArrayList<BigInteger> dataPartitions : dataPartitionsIter) + for (List<BigInteger> dataPartitions : dataPartitionsIter) { // long startTime = System.currentTimeMillis(); @@ -235,14 +234,14 @@ public class ComputeEncryptedRow * Emits {@code Tuple2<<colNum, colVal>>} * */ - public static ArrayList<Tuple2<Long,BigInteger>> computeEncRowCacheInput(Iterable<ArrayList<BigInteger>> dataPartitionsIter, + public static List<Tuple2<Long,BigInteger>> computeEncRowCacheInput(Iterable<List<BigInteger>> dataPartitionsIter, HashMap<Integer,BigInteger> cache, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector) throws IOException { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); long colCounter = 0; int elementCounter = 0; - for (ArrayList<BigInteger> dataPartitions : dataPartitionsIter) + for (List<BigInteger> dataPartitions : dataPartitionsIter) { logger.debug("elementCounter = " + elementCounter); @@ -284,9 +283,9 @@ public class ComputeEncryptedRow * Emits {@code Tuple2<<colNum, colVal>>} * */ - public static ArrayList<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable dataPartitions, Query query, int rowIndex, int colIndex) throws IOException + public static List<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable dataPartitions, Query query, int rowIndex, int colIndex) throws IOException { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); // Pull the corresponding encrypted row query BigInteger rowQuery = query.getQueryElement(rowIndex); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java index 7a652df..61169f2 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java @@ -19,7 +19,7 @@ package org.apache.pirk.responder.wideskies.common; import java.math.BigInteger; -import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.io.MapWritable; import org.apache.pirk.inputformat.hadoop.BytesArrayWritable; @@ -42,11 +42,9 @@ public class HashSelectorAndPartitionData { private static final Logger logger = LoggerFactory.getLogger(HashSelectorAndPartitionData.class); - public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitionsBigInteger(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, + public static Tuple2<Integer,List<BigInteger>> hashSelectorAndFormPartitionsBigInteger(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple; - // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); @@ -54,18 +52,14 @@ public class HashSelectorAndPartitionData // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); - - returnTuple = new Tuple2<>(hash, hitValPartitions); + List<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); - return returnTuple; + return new Tuple2<>(hash, hitValPartitions); } public static Tuple2<Integer,BytesArrayWritable> hashSelectorAndFormPartitions(MapWritable dataElement, QuerySchema qSchema, DataSchema dSchema, QueryInfo queryInfo) throws Exception { - Tuple2<Integer,BytesArrayWritable> returnTuple; - // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryType(dataElement, qSchema, dSchema); int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); @@ -73,18 +67,14 @@ public class HashSelectorAndPartitionData // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); + List<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(dataElement, qSchema, dSchema, queryInfo.getEmbedSelector()); BytesArrayWritable bAW = new BytesArrayWritable(hitValPartitions); - returnTuple = new Tuple2<>(hash, bAW); - - return returnTuple; + return new Tuple2<>(hash, bAW); } - public static Tuple2<Integer,ArrayList<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo, QuerySchema qSchema) throws Exception + public static Tuple2<Integer,List<BigInteger>> hashSelectorAndFormPartitions(JSONObject json, QueryInfo queryInfo, QuerySchema qSchema) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple; - // Pull the selector based on the query type String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json); int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); @@ -92,10 +82,8 @@ public class HashSelectorAndPartitionData // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, json, queryInfo.getEmbedSelector()); - - returnTuple = new Tuple2<>(hash, hitValPartitions); + List<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, json, queryInfo.getEmbedSelector()); - return returnTuple; + return new Tuple2<>(hash, hitValPartitions); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java index 3b05a52..61723d0 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java @@ -18,10 +18,6 @@ */ package org.apache.pirk.responder.wideskies.mapreduce; -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -39,9 +35,12 @@ import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple2; +import java.io.IOException; +import java.math.BigInteger; +import java.util.List; + /** * Reducer class to calculate the encrypted rows of the encrypted query * <p> @@ -121,7 +120,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW } // Compute the encrypted row elements for a query from extracted data partitions - ArrayList<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRow(dataElementPartitions, query, rowIndex.get(), limitHitsPerSelector, + List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRow(dataElementPartitions, query, rowIndex.get(), limitHitsPerSelector, maxHitsPerSelector, useLocalCache); // Emit <colNum, colVal> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 2de4a2a..3124a3f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -21,6 +21,7 @@ package org.apache.pirk.responder.wideskies.spark; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -331,10 +332,10 @@ public class ComputeResponse // Extract the selectors for each dataElement based upon the query type // and perform a keyed hash of the selectors - JavaPairRDD<Integer,ArrayList<BigInteger>> selectorHashToDocRDD = inputRDD.mapToPair(new HashSelectorsAndPartitionData(accum, bVars)); + JavaPairRDD<Integer,List<BigInteger>> selectorHashToDocRDD = inputRDD.mapToPair(new HashSelectorsAndPartitionData(accum, bVars)); // Group by hashed selector (row) -- can combine with the line above, separating for testing and benchmarking... - JavaPairRDD<Integer,Iterable<ArrayList<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey(); + JavaPairRDD<Integer,Iterable<List<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey(); // Calculate the encrypted row values for each row, emit <colNum, colVal> for each row JavaPairRDD<Long,BigInteger> encRowRDD; @@ -347,7 +348,7 @@ public class ComputeResponse JavaPairRDD<Integer,Iterable<Tuple2<Integer,BigInteger>>> expCalculations = ComputeExpLookupTable.computeExpTable(sc, fs, bVars, query, queryInput, outputDirExp, useModExpJoin); - JavaPairRDD<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<ArrayList<BigInteger>>>> encMapDataJoin = expCalculations.join(selectorGroupRDD); + JavaPairRDD<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>> encMapDataJoin = expCalculations.join(selectorGroupRDD); // Calculate the encrypted row values for each row, emit <colNum, colVal> for each row encRowRDD = encMapDataJoin.flatMapToPair(new EncRowCalcPrecomputedCache(accum, bVars)); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java index 0e860dd..04f8cc2 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java @@ -21,6 +21,7 @@ package org.apache.pirk.responder.wideskies.spark; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -41,7 +42,7 @@ import scala.Tuple2; * Emits {@code <colNum, colVal>} * */ -public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<ArrayList<BigInteger>>>,Long,BigInteger> +public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<List<BigInteger>>>,Long,BigInteger> { private static final long serialVersionUID = 1L; @@ -73,9 +74,9 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A } @Override - public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<ArrayList<BigInteger>>> hashDocTuple) throws Exception + public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<List<BigInteger>>> hashDocTuple) throws Exception { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); int rowIndex = hashDocTuple._1; accum.incNumHashes(1); @@ -98,7 +99,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<A // long startTime = System.currentTimeMillis(); // Compute the encrypted row elements for a query from extracted data partitions - ArrayList<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector, + List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector, maxHitsPerSelector, useLocalCache); // long endTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java index c7610f8..038287b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java @@ -21,6 +21,7 @@ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; @@ -34,7 +35,7 @@ import scala.Tuple2; * Functionality for computing the encrypted rows using a pre-computed, passed in modular exponentiation lookup table */ public class EncRowCalcPrecomputedCache implements - PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<ArrayList<BigInteger>>>>,Long,BigInteger> + PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>>,Long,BigInteger> { private static final long serialVersionUID = 1L; @@ -64,10 +65,10 @@ public class EncRowCalcPrecomputedCache implements } @Override - public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<ArrayList<BigInteger>>>> hashDocTuple) + public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>> hashDocTuple) throws Exception { - ArrayList<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); int rowIndex = hashDocTuple._1; accum.incNumHashes(1); @@ -80,13 +81,13 @@ public class EncRowCalcPrecomputedCache implements expTable.put(entry._1, entry._2); } - Iterable<ArrayList<BigInteger>> dataPartitions = hashDocTuple._2._2; + Iterable<List<BigInteger>> dataPartitions = hashDocTuple._2._2; // logger.debug("Encrypting row = " + rowIndex); // long startTime = System.currentTimeMillis(); // Compute the encrypted row elements for a query from extracted data partitions - ArrayList<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowCacheInput(dataPartitions, expTable, rowIndex, limitHitsPerSelector, + List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowCacheInput(dataPartitions, expTable, rowIndex, limitHitsPerSelector, maxHitsPerSelector); // long endTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java index 6e9c715..f426aad 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java @@ -20,6 +20,7 @@ package org.apache.pirk.responder.wideskies.spark; import java.math.BigInteger; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.io.MapWritable; import org.apache.pirk.query.wideskies.QueryInfo; @@ -37,7 +38,7 @@ import scala.Tuple2; * output {@code <hash(selector), dataPartitions>} * */ -public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,Integer,ArrayList<BigInteger>> +public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,Integer,List<BigInteger>> { private static final long serialVersionUID = 1L; @@ -57,13 +58,9 @@ public class HashSelectorsAndPartitionData implements PairFunction<MapWritable,I } @Override - public Tuple2<Integer,ArrayList<BigInteger>> call(MapWritable doc) throws Exception + public Tuple2<Integer,List<BigInteger>> call(MapWritable doc) throws Exception { - Tuple2<Integer,ArrayList<BigInteger>> returnTuple; - // Extract the selector, compute the hash, and partition the data element according to query type - returnTuple = HashSelectorAndPartitionData.hashSelectorAndFormPartitionsBigInteger(doc, qSchema, dSchema, queryInfo); - - return returnTuple; + return HashSelectorAndPartitionData.hashSelectorAndFormPartitionsBigInteger(doc, qSchema, dSchema, queryInfo); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index 7883fb8..03e5a76 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -18,13 +18,6 @@ */ package org.apache.pirk.responder.wideskies.standalone; -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.TreeMap; - import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; @@ -40,6 +33,14 @@ import org.json.simple.parser.JSONParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; + /** * Class to perform stand alone responder functionalities * <p> @@ -59,8 +60,6 @@ public class Responder private QueryInfo queryInfo = null; private QuerySchema qSchema = null; - private String queryType = null; - private Response response = null; private TreeMap<Integer,BigInteger> columns = null; // the column values for the PIR calculations @@ -71,7 +70,7 @@ public class Responder { query = queryInput; queryInfo = query.getQueryInfo(); - queryType = queryInfo.getQueryType(); + String queryType = queryInfo.getQueryType(); if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false)) { @@ -170,7 +169,7 @@ public class Responder { // Extract the data bits based on the query type // Partition by the given partitionSize - ArrayList<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, jsonData, queryInfo.getEmbedSelector()); + List<BigInteger> hitValPartitions = QueryUtils.partitionDataElement(qSchema, jsonData, queryInfo.getEmbedSelector()); // Pull the necessary elements int rowIndex = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java index f767a5a..7559441 100644 --- a/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java +++ b/src/main/java/org/apache/pirk/schema/data/partitioner/PrimitiveTypePartitioner.java @@ -48,8 +48,7 @@ public class PrimitiveTypePartitioner implements DataPartitioner public static final String STRING = "string"; /** - * Splits the given BigInteger into partitions given by the partitionSize - * + * Splits the given BigInteger into partitions given by the partitionSize. */ public static List<BigInteger> partitionBits(BigInteger value, int partitionSize, BigInteger mask) throws PIRException { @@ -90,12 +89,20 @@ public class PrimitiveTypePartitioner implements DataPartitioner } /** - * Method to form a BigInteger bit mask for the given partitionSize - * + * Returns a <code>BigInteger</code> bit mask for the given partitionSize. */ public static BigInteger formBitMask(int partitionSize) { - return BigInteger.valueOf(2).pow(partitionSize).subtract(BigInteger.ONE); + BigInteger mask; + if (partitionSize < 32) + { + mask = BigInteger.valueOf((1 << partitionSize) - 1); + } + else + { + mask = BigInteger.valueOf(2).pow(partitionSize).subtract(BigInteger.ONE); + } + return mask; } /** @@ -229,13 +236,12 @@ public class PrimitiveTypePartitioner implements DataPartitioner } /** - * - * Partitions an object to an ArrayList of BigInteger values, currently represents an 8-bit partitioning + * Partitions an object to a List of BigInteger values, currently represents an 8-bit partitioning */ @Override - public ArrayList<BigInteger> toPartitions(Object obj, String type) throws PIRException + public List<BigInteger> toPartitions(Object obj, String type) throws PIRException { - ArrayList<BigInteger> parts = new ArrayList<>(); + List<BigInteger> parts = new ArrayList<>(); byte[] bytes = new byte[0]; @@ -325,9 +331,9 @@ public class PrimitiveTypePartitioner implements DataPartitioner * Create partitions for an array of the same type of elements - used when a data value field is an array and we wish to encode these into the return value */ @Override - public ArrayList<BigInteger> arrayToPartitions(List<?> elementList, String type) throws PIRException + public List<BigInteger> arrayToPartitions(List<?> elementList, String type) throws PIRException { - ArrayList<BigInteger> parts = new ArrayList<>(); + List<BigInteger> parts = new ArrayList<>(); int numArrayElementsToReturn = SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1); for (int i = 0; i < numArrayElementsToReturn; ++i) http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java index 496840c..ee37e63 100755 --- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java @@ -19,6 +19,7 @@ package org.apache.pirk.test.distributed; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -69,7 +70,7 @@ public class DistributedTestDriver logger.info("jarFile = " + jarFile); SystemConfiguration.setProperty("jarFile", jarFile); - ArrayList<JSONObject> dataElements = initialize(fs); + List<JSONObject> dataElements = initialize(fs); // Pull off the properties and reset upon completion String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", "none"); @@ -85,9 +86,9 @@ public class DistributedTestDriver /** * Create all inputs */ - public static ArrayList<JSONObject> initialize(FileSystem fs) throws Exception + public static List<JSONObject> initialize(FileSystem fs) throws Exception { - ArrayList<JSONObject> dataElements = Inputs.createPIRJSONInput(fs); + List<JSONObject> dataElements = Inputs.createPIRJSONInput(fs); String localStopListFile = Inputs.createPIRStopList(fs, true); SystemConfiguration.setProperty("pir.stopListFile", localStopListFile); @@ -100,7 +101,7 @@ public class DistributedTestDriver /** * Execute Tests */ - public static void test(FileSystem fs, DistributedTestCLI cli, ArrayList<JSONObject> pirDataElements) throws Exception + public static void test(FileSystem fs, DistributedTestCLI cli, List<JSONObject> pirDataElements) throws Exception { if (cli.run("1:J")) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java index 0556ead..58f835c 100644 --- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -58,7 +58,7 @@ public class DistTestSuite // This method also tests all non-query specific configuration options/properties // for the MapReduce version of PIR - public static void testJSONInputMR(FileSystem fs, ArrayList<JSONObject> dataElements) throws Exception + public static void testJSONInputMR(FileSystem fs, List<JSONObject> dataElements) throws Exception { logger.info("Starting testJSONInputMR"); @@ -151,7 +151,7 @@ public class DistTestSuite logger.info("Completed testJSONInputMR"); } - public static void testESInputMR(FileSystem fs, ArrayList<JSONObject> dataElements) throws Exception + public static void testESInputMR(FileSystem fs, List<JSONObject> dataElements) throws Exception { logger.info("Starting testESInputMR"); @@ -190,7 +190,7 @@ public class DistTestSuite logger.info("Completed testESInputMR"); } - public static void testJSONInputSpark(FileSystem fs, ArrayList<JSONObject> dataElements) throws Exception + public static void testJSONInputSpark(FileSystem fs, List<JSONObject> dataElements) throws Exception { logger.info("Starting testJSONInputSpark"); @@ -282,7 +282,7 @@ public class DistTestSuite logger.info("Completed testJSONInputSpark"); } - public static void testESInputSpark(FileSystem fs, ArrayList<JSONObject> dataElements) throws Exception + public static void testESInputSpark(FileSystem fs, List<JSONObject> dataElements) throws Exception { logger.info("Starting testESInputSpark"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index 26ab2eb..a55ed4d 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -18,15 +18,6 @@ */ package org.apache.pirk.test.utils; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.schema.query.QuerySchema; @@ -40,6 +31,15 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.fail; + /** * Class to hold the base functional distributed tests */ @@ -65,14 +65,14 @@ public class BaseTests testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive); } - public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false); } // Query for the watched hostname occurred; ; watched value type: hostname (String) - public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean testFalsePositive) throws Exception { logger.info("Running testDNSHostnameQuery(): "); @@ -197,7 +197,7 @@ public class BaseTests } // The watched IP address was detected in the response to a query; watched value type: IP address (String) - public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { logger.info("Running testDNSIPQuery(): "); @@ -270,7 +270,7 @@ public class BaseTests } // A query that returned an nxdomain response was made for the watched hostname; watched value type: hostname (String) - public static void testDNSNXDOMAINQuery(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { logger.info("Running testDNSNXDOMAINQuery(): "); @@ -334,7 +334,7 @@ public class BaseTests } // Query for responses from watched srcIPs - public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { logger.info("Running testSRCIPQuery(): "); @@ -406,7 +406,7 @@ public class BaseTests } // Query for responses from watched srcIPs - public static void testSRCIPQueryNoFilter(ArrayList<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { logger.info("Running testSRCIPQueryNoFilter(): "); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index 98aa5db..10c1386 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -260,13 +260,12 @@ public class Inputs /** * Creates PIR JSON input and writes to hdfs */ - @SuppressWarnings("unchecked") - public static ArrayList<JSONObject> createPIRJSONInput(FileSystem fs) + public static List<JSONObject> createPIRJSONInput(FileSystem fs) { String inputJSONFile = SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY); logger.info("PIR JSON input being created at " + inputJSONFile); - ArrayList<JSONObject> dataElementsJSON = createJSONDataElements(); + List<JSONObject> dataElementsJSON = createJSONDataElements(); HDFS.writeFile(dataElementsJSON, fs, inputJSONFile, true); logger.info("PIR JSON input successfully created!"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java index a4bf67d..1c26bdd 100644 --- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java +++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java @@ -18,14 +18,6 @@ */ package org.apache.pirk.test.utils; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - import org.apache.pirk.encryption.Paillier; import org.apache.pirk.querier.wideskies.Querier; import org.apache.pirk.querier.wideskies.QuerierConst; @@ -46,6 +38,13 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.fail; + public class StandaloneQuery { private static final Logger logger = LoggerFactory.getLogger(StandaloneQuery.class); @@ -57,7 +56,7 @@ public class StandaloneQuery String testQuerySchemaName = "testQuerySchema"; // Base method to perform the query - public static List<QueryResponseJSON> performStandaloneQuery(ArrayList<JSONObject> dataElements, String queryType, ArrayList<String> selectors, + public static List<QueryResponseJSON> performStandaloneQuery(List<JSONObject> dataElements, String queryType, List<String> selectors, int numThreads, boolean testFalsePositive) throws IOException, InterruptedException, PIRException { logger.info("Performing watchlisting: "); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/main/java/org/apache/pirk/utils/KeyedHash.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/KeyedHash.java b/src/main/java/org/apache/pirk/utils/KeyedHash.java index 579b196..3dfa15d 100644 --- a/src/main/java/org/apache/pirk/utils/KeyedHash.java +++ b/src/main/java/org/apache/pirk/utils/KeyedHash.java @@ -18,12 +18,12 @@ */ package org.apache.pirk.utils; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + /** * Class for the PIR keyed hash * <p> @@ -39,9 +39,7 @@ public class KeyedHash */ public static int hash(String key, int bitSize, String input) { - String concat = key + input; - - int fullHash = Math.abs(concat.hashCode()); + int fullHash = (key + input).hashCode(); // Take only the lower bitSize-many bits of the resultant hash int bitLimitedHash = fullHash; @@ -61,10 +59,9 @@ public class KeyedHash { int bitLimitedHash; - MessageDigest md; try { - md = MessageDigest.getInstance(hashType); + MessageDigest md = MessageDigest.getInstance(hashType); byte[] array = md.digest(input.getBytes()); int hashInt = fromByteArray(array); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/d992fa29/src/test/java/org/apache/pirk/general/PartitionUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/general/PartitionUtilsTest.java b/src/test/java/org/apache/pirk/general/PartitionUtilsTest.java index ccca2d7..6f779cd 100644 --- a/src/test/java/org/apache/pirk/general/PartitionUtilsTest.java +++ b/src/test/java/org/apache/pirk/general/PartitionUtilsTest.java @@ -18,14 +18,6 @@ */ package org.apache.pirk.general; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - import org.apache.pirk.schema.data.partitioner.IPDataPartitioner; import org.apache.pirk.schema.data.partitioner.ISO8601DatePartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; @@ -35,6 +27,13 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * Class to functionally test the bit conversion utils */ @@ -47,15 +46,23 @@ public class PartitionUtilsTest { logger.info("Starting testMask: "); - BigInteger mask = PrimitiveTypePartitioner.formBitMask(4); // 1111 + assertEquals(0, PrimitiveTypePartitioner.formBitMask(0).intValue()); - assertEquals(mask.intValue(), 15); + assertEquals(0b000000000000001, PrimitiveTypePartitioner.formBitMask(1).intValue()); + assertEquals(0b000000000001111, PrimitiveTypePartitioner.formBitMask(4).intValue()); + assertEquals(0b000000001111111, PrimitiveTypePartitioner.formBitMask(7).intValue()); + assertEquals(0b111111111111111, PrimitiveTypePartitioner.formBitMask(15).intValue()); + + assertEquals(new BigInteger("FFFFF", 16), PrimitiveTypePartitioner.formBitMask(20)); + assertEquals(new BigInteger("FFFFFFFF", 16), PrimitiveTypePartitioner.formBitMask(32)); + assertEquals(new BigInteger("3FFFFFFFFFF", 16), PrimitiveTypePartitioner.formBitMask(42)); + assertEquals(new BigInteger("7FFFFFFFFFFFFFFF", 16), PrimitiveTypePartitioner.formBitMask(63)); logger.info("Successfully completed testMask"); } @Test - public void testPartitionBits() + public void testPartitionBits() throws PIRException { logger.info("Starting testPartitionBits: "); @@ -65,52 +72,29 @@ public class PartitionUtilsTest BigInteger mask4 = PrimitiveTypePartitioner.formBitMask(4); // 1111 BigInteger mask8 = PrimitiveTypePartitioner.formBitMask(8); // 11111111 - try - { - List<BigInteger> partitions = PrimitiveTypePartitioner.partitionBits(value, 4, mask4); + List<BigInteger> partitions = PrimitiveTypePartitioner.partitionBits(value, 4, mask4); + assertEquals(2, partitions.size()); + assertEquals(0b1111, partitions.get(0).intValue()); + assertEquals(0b0101, partitions.get(1).intValue()); - assertEquals(2, partitions.size()); - assertEquals(partitions.get(0).intValue(), 15); // 1111 - assertEquals(partitions.get(1).intValue(), 5); // 0101 + partitions = PrimitiveTypePartitioner.partitionBits(value2, 4, mask4); + assertEquals(3, partitions.size()); + assertEquals(0b1111, partitions.get(0).intValue()); + assertEquals(0b0101, partitions.get(1).intValue()); + assertEquals(0b0011, partitions.get(2).intValue()); - } catch (Exception e) - { - fail(e.toString()); - } + partitions = PrimitiveTypePartitioner.partitionBits(value, 8, mask8); + assertEquals(1, partitions.size()); + assertEquals(0b11110101, partitions.get(0).intValue()); try { - List<BigInteger> partitions = PrimitiveTypePartitioner.partitionBits(value2, 4, mask4); - - assertEquals(3, partitions.size()); - assertEquals(partitions.get(0).intValue(), 15); // 1111 - assertEquals(partitions.get(1).intValue(), 5); // 0101 - assertEquals(partitions.get(2).intValue(), 3); // 11 - - } catch (Exception e) - { - fail(e.toString()); - } - try - { - List<BigInteger> partitions = PrimitiveTypePartitioner.partitionBits(value, 8, mask8); - - assertEquals(1, partitions.size()); - assertEquals(partitions.get(0).intValue(), 245); - - } catch (Exception e) - { - fail(e.toString()); - } - - try - { - List<BigInteger> partitions = PrimitiveTypePartitioner.partitionBits(value, 4, mask8); - + partitions = PrimitiveTypePartitioner.partitionBits(value, 4, mask8); fail("BitConversionUtils.partitionBits did not throw error for mismatched partitionSize and mask size"); - } catch (Exception ignore) - {} + { + // Expected. + } logger.info("Successfully completed testPartitionBits"); } @@ -138,7 +122,7 @@ public class PartitionUtilsTest // Test byte byte bTest = Byte.parseByte("10"); - ArrayList<BigInteger> partsByte = primitivePartitioner.toPartitions(bTest, PrimitiveTypePartitioner.BYTE); + List<BigInteger> partsByte = primitivePartitioner.toPartitions(bTest, PrimitiveTypePartitioner.BYTE); assertEquals(1, partsByte.size()); assertEquals(bTest, primitivePartitioner.fromPartitions(partsByte, 0, PrimitiveTypePartitioner.BYTE)); @@ -146,7 +130,7 @@ public class PartitionUtilsTest assertEquals(1, partsByte.size()); assertEquals((byte) 12, primitivePartitioner.fromPartitions(partsByte, 0, PrimitiveTypePartitioner.BYTE)); - ArrayList<BigInteger> partsByteMax = primitivePartitioner.toPartitions(Byte.MAX_VALUE, PrimitiveTypePartitioner.BYTE); + List<BigInteger> partsByteMax = primitivePartitioner.toPartitions(Byte.MAX_VALUE, PrimitiveTypePartitioner.BYTE); assertEquals(1, partsByteMax.size()); assertEquals(Byte.MAX_VALUE, primitivePartitioner.fromPartitions(partsByteMax, 0, PrimitiveTypePartitioner.BYTE)); @@ -159,7 +143,7 @@ public class PartitionUtilsTest // Test short short shortTest = Short.valueOf("2456"); - ArrayList<BigInteger> partsShort = primitivePartitioner.toPartitions(shortTest, PrimitiveTypePartitioner.SHORT); + List<BigInteger> partsShort = primitivePartitioner.toPartitions(shortTest, PrimitiveTypePartitioner.SHORT); assertEquals(2, partsShort.size()); assertEquals(shortTest, primitivePartitioner.fromPartitions(partsShort, 0, PrimitiveTypePartitioner.SHORT)); @@ -171,13 +155,13 @@ public class PartitionUtilsTest assertEquals(2, partsShort.size()); assertEquals((short) -42, primitivePartitioner.fromPartitions(partsShort, 0, PrimitiveTypePartitioner.SHORT)); - ArrayList<BigInteger> partsShortMax = primitivePartitioner.toPartitions(Short.MAX_VALUE, PrimitiveTypePartitioner.SHORT); + List<BigInteger> partsShortMax = primitivePartitioner.toPartitions(Short.MAX_VALUE, PrimitiveTypePartitioner.SHORT); assertEquals(2, partsShortMax.size()); assertEquals(Short.MAX_VALUE, primitivePartitioner.fromPartitions(partsShortMax, 0, PrimitiveTypePartitioner.SHORT)); // Test int int intTest = Integer.parseInt("-5789"); - ArrayList<BigInteger> partsInt = primitivePartitioner.toPartitions(intTest, PrimitiveTypePartitioner.INT); + List<BigInteger> partsInt = primitivePartitioner.toPartitions(intTest, PrimitiveTypePartitioner.INT); assertEquals(4, partsInt.size()); assertEquals(intTest, primitivePartitioner.fromPartitions(partsInt, 0, PrimitiveTypePartitioner.INT)); @@ -189,23 +173,23 @@ public class PartitionUtilsTest assertEquals(4, partsInt.size()); assertEquals(1386681237, primitivePartitioner.fromPartitions(partsInt, 0, PrimitiveTypePartitioner.INT)); - ArrayList<BigInteger> partsIntMax = primitivePartitioner.toPartitions(Integer.MAX_VALUE, PrimitiveTypePartitioner.INT); + List<BigInteger> partsIntMax = primitivePartitioner.toPartitions(Integer.MAX_VALUE, PrimitiveTypePartitioner.INT); assertEquals(4, partsIntMax.size()); assertEquals(Integer.MAX_VALUE, primitivePartitioner.fromPartitions(partsIntMax, 0, PrimitiveTypePartitioner.INT)); // Test long long longTest = Long.parseLong("56789"); - ArrayList<BigInteger> partsLong = primitivePartitioner.toPartitions(longTest, PrimitiveTypePartitioner.LONG); + List<BigInteger> partsLong = primitivePartitioner.toPartitions(longTest, PrimitiveTypePartitioner.LONG); assertEquals(8, partsLong.size()); assertEquals(longTest, primitivePartitioner.fromPartitions(partsLong, 0, PrimitiveTypePartitioner.LONG)); - ArrayList<BigInteger> partsLongMax = primitivePartitioner.toPartitions(Long.MAX_VALUE, PrimitiveTypePartitioner.LONG); + List<BigInteger> partsLongMax = primitivePartitioner.toPartitions(Long.MAX_VALUE, PrimitiveTypePartitioner.LONG); assertEquals(8, partsLongMax.size()); assertEquals(Long.MAX_VALUE, primitivePartitioner.fromPartitions(partsLongMax, 0, PrimitiveTypePartitioner.LONG)); // Test float float floatTest = Float.parseFloat("567.77"); - ArrayList<BigInteger> partsFloat = primitivePartitioner.toPartitions(floatTest, PrimitiveTypePartitioner.FLOAT); + List<BigInteger> partsFloat = primitivePartitioner.toPartitions(floatTest, PrimitiveTypePartitioner.FLOAT); assertEquals(4, partsFloat.size()); assertEquals(floatTest, primitivePartitioner.fromPartitions(partsFloat, 0, PrimitiveTypePartitioner.FLOAT)); @@ -213,23 +197,23 @@ public class PartitionUtilsTest assertEquals(4, partsFloat.size()); assertEquals(-99.99f, primitivePartitioner.fromPartitions(partsFloat, 0, PrimitiveTypePartitioner.FLOAT)); - ArrayList<BigInteger> partsFloatMax = primitivePartitioner.toPartitions(Float.MAX_VALUE, PrimitiveTypePartitioner.FLOAT); + List<BigInteger> partsFloatMax = primitivePartitioner.toPartitions(Float.MAX_VALUE, PrimitiveTypePartitioner.FLOAT); assertEquals(4, partsFloatMax.size()); assertEquals(Float.MAX_VALUE, primitivePartitioner.fromPartitions(partsFloatMax, 0, PrimitiveTypePartitioner.FLOAT)); // Test double double doubleTest = Double.parseDouble("567.77"); - ArrayList<BigInteger> partsDouble = primitivePartitioner.toPartitions(doubleTest, PrimitiveTypePartitioner.DOUBLE); + List<BigInteger> partsDouble = primitivePartitioner.toPartitions(doubleTest, PrimitiveTypePartitioner.DOUBLE); assertEquals(8, partsDouble.size()); assertEquals(doubleTest, primitivePartitioner.fromPartitions(partsDouble, 0, PrimitiveTypePartitioner.DOUBLE)); - ArrayList<BigInteger> partsDoubleMax = primitivePartitioner.toPartitions(Double.MAX_VALUE, PrimitiveTypePartitioner.DOUBLE); + List<BigInteger> partsDoubleMax = primitivePartitioner.toPartitions(Double.MAX_VALUE, PrimitiveTypePartitioner.DOUBLE); assertEquals(8, partsDoubleMax.size()); assertEquals(Double.MAX_VALUE, primitivePartitioner.fromPartitions(partsDoubleMax, 0, PrimitiveTypePartitioner.DOUBLE)); // Test char char charTest = 'b'; - ArrayList<BigInteger> partsChar = primitivePartitioner.toPartitions(charTest, PrimitiveTypePartitioner.CHAR); + List<BigInteger> partsChar = primitivePartitioner.toPartitions(charTest, PrimitiveTypePartitioner.CHAR); assertEquals(2, partsChar.size()); assertEquals(charTest, primitivePartitioner.fromPartitions(partsChar, 0, PrimitiveTypePartitioner.CHAR)); @@ -244,7 +228,7 @@ public class PartitionUtilsTest assertEquals(2, partsChar.size()); assertEquals(charTest, primitivePartitioner.fromPartitions(partsChar, 0, PrimitiveTypePartitioner.CHAR)); - ArrayList<BigInteger> partsCharMax = primitivePartitioner.toPartitions(Character.MAX_VALUE, PrimitiveTypePartitioner.CHAR); + List<BigInteger> partsCharMax = primitivePartitioner.toPartitions(Character.MAX_VALUE, PrimitiveTypePartitioner.CHAR); assertEquals(2, partsCharMax.size()); assertEquals(Character.MAX_VALUE, primitivePartitioner.fromPartitions(partsCharMax, 0, PrimitiveTypePartitioner.CHAR)); @@ -269,7 +253,7 @@ public class PartitionUtilsTest { PrimitiveTypePartitioner ptp = new PrimitiveTypePartitioner(); - ArrayList<BigInteger> partsString = ptp.toPartitions(testString, PrimitiveTypePartitioner.STRING); + List<BigInteger> partsString = ptp.toPartitions(testString, PrimitiveTypePartitioner.STRING); int numParts = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) / 8; assertEquals(numParts, partsString.size());
