Revert - Move all tests under src/test directory - closes apache/incubator-pirk#70
Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/9244df72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/9244df72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/9244df72 Branch: refs/heads/master Commit: 9244df72be448aa32b0d146a6f268c363dc16fd6 Parents: a643ae6 Author: smarthi <[email protected]> Authored: Thu Aug 18 12:39:43 2016 -0400 Committer: eawilliams <[email protected]> Committed: Thu Aug 18 12:39:43 2016 -0400 ---------------------------------------------------------------------- pom.xml | 2 + .../apache/pirk/benchmark/BenchmarkDriver.java | 36 ++ .../pirk/benchmark/PaillierBenchmark.java | 126 ++++ .../test/distributed/DistributedTestCLI.java | 188 ++++++ .../test/distributed/DistributedTestDriver.java | 149 +++++ .../distributed/testsuite/DistTestSuite.java | 458 +++++++++++++ .../org/apache/pirk/test/utils/BaseTests.java | 643 +++++++++++++++++++ .../java/org/apache/pirk/test/utils/Inputs.java | 606 +++++++++++++++++ .../apache/pirk/test/utils/StandaloneQuery.java | 164 +++++ .../org/apache/pirk/test/utils/TestUtils.java | 312 +++++++++ .../pirk/general/ISO8601DateParserTest.java | 50 ++ .../org/apache/pirk/general/KeyedHashTest.java | 83 +++ .../org/apache/pirk/general/PaillierTest.java | 303 +++++++++ .../apache/pirk/general/PartitionUtilsTest.java | 269 ++++++++ .../pirk/general/QueryParserUtilsTest.java | 421 ++++++++++++ .../pirk/schema/data/LoadDataSchemaTest.java | 324 ++++++++++ .../pirk/schema/query/LoadQuerySchemaTest.java | 368 +++++++++++ .../pirk/serialization/SerializationTest.java | 134 ++++ .../pirk/test/benchmark/BenchmarkDriver.java | 36 -- .../pirk/test/benchmark/PaillierBenchmark.java | 126 ---- .../test/distributed/DistributedTestCLI.java | 188 ------ .../test/distributed/DistributedTestDriver.java | 149 ----- .../distributed/testsuite/DistTestSuite.java | 458 ------------- .../test/general/ISO8601DateParserTest.java | 50 -- .../apache/pirk/test/general/KeyedHashTest.java | 83 --- .../apache/pirk/test/general/PaillierTest.java | 303 --------- .../pirk/test/general/PartitionUtilsTest.java | 269 -------- .../pirk/test/general/QueryParserUtilsTest.java | 421 ------------ .../test/schema/data/LoadDataSchemaTest.java | 327 ---------- .../test/schema/query/LoadQuerySchemaTest.java | 371 ----------- .../test/serialization/SerializationTest.java | 137 ---- .../org/apache/pirk/test/utils/BaseTests.java | 643 ------------------- .../java/org/apache/pirk/test/utils/Inputs.java | 606 ----------------- .../apache/pirk/test/utils/StandaloneQuery.java | 164 ----- .../org/apache/pirk/test/utils/TestUtils.java | 312 --------- .../wideskies/standalone/StandaloneTest.java | 128 ---- .../wideskies/standalone/StandaloneTest.java | 128 ++++ 37 files changed, 4764 insertions(+), 4771 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5a68e32..ff2eb3f 100644 --- a/pom.xml +++ b/pom.xml @@ -451,6 +451,7 @@ <version>3.0.1</version> <configuration> <excludes> + <exclude>org/apache/pirk/benchmark/**</exclude> <exclude>*/openjdk/**</exclude> <exclude>generated-sources/**</exclude> </excludes> @@ -463,6 +464,7 @@ <version>3.0.1</version> <configuration> <excludes> + <exclude>org/apache/pirk/benchmark/**</exclude> <exclude>*/openjdk/**</exclude> <exclude>generated-sources/**</exclude> </excludes> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/benchmark/BenchmarkDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/benchmark/BenchmarkDriver.java b/src/main/java/org/apache/pirk/benchmark/BenchmarkDriver.java new file mode 100644 index 0000000..a24d0da --- /dev/null +++ b/src/main/java/org/apache/pirk/benchmark/BenchmarkDriver.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.benchmark; + +import java.io.IOException; + +import org.openjdk.jmh.Main; +import org.openjdk.jmh.runner.RunnerException; + +/** + * Driver for JMH benchmarking + */ +public class BenchmarkDriver +{ + public static void main(String[] args) throws RunnerException, IOException + { + Main.main(args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/benchmark/PaillierBenchmark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/benchmark/PaillierBenchmark.java b/src/main/java/org/apache/pirk/benchmark/PaillierBenchmark.java new file mode 100644 index 0000000..95f850d --- /dev/null +++ b/src/main/java/org/apache/pirk/benchmark/PaillierBenchmark.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.benchmark; + +import java.math.BigInteger; + +import org.apache.pirk.encryption.ModPowAbstraction; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JMH benchmark to evaluate Paillier performance both with and without using com.square.jnagmp.gmp to accelerate modPow + * <p> + * Guides to using JMH can be found at: http://tutorials.jenkov.com/java-performance/jmh.html and http://nitschinger.at/Using-JMH-for-Java-Microbenchmarking/ + */ + +public class PaillierBenchmark +{ + private static final int MODULUS_SIZE = 3074; + private static final Logger logger = LoggerFactory.getLogger(PaillierBenchmark.class); + + @State(Scope.Benchmark) + public static class PaillierBenchmarkState + { + BigInteger r1 = null; // random number in (Z/NZ)* + BigInteger m1 = null; // message to encrypt + + Paillier pallier = null; + + /** + * This sets up the state for the two separate benchmarks + */ + @Setup(org.openjdk.jmh.annotations.Level.Trial) + public void setUp() + { + int systemPrimeCertainty = SystemConfiguration.getIntProperty("pir.primeCertainty", 100); + try + { + pallier = new Paillier(MODULUS_SIZE, systemPrimeCertainty); + + } catch (PIRException e) + { + System.out.printf("Couldn't build pallier object!%n"); + } + + r1 = BigInteger.valueOf(3); + m1 = BigInteger.valueOf(5); + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void testWithGMP(PaillierBenchmarkState allState) + { + SystemConfiguration.setProperty("paillier.useGMPForModPow", "true"); + SystemConfiguration.setProperty("paillier.GMPConstantTimeMode", "false"); + ModPowAbstraction.reloadConfiguration(); + + try + { + allState.pallier.encrypt(allState.m1, allState.r1); + } catch (PIRException e) + { + logger.info("Exception in testWithGMP!\n"); + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void testWithGMPConstantTime(PaillierBenchmarkState allState) + { + SystemConfiguration.setProperty("paillier.useGMPForModPow", "true"); + SystemConfiguration.setProperty("paillier.GMPConstantTimeMode", "true"); + ModPowAbstraction.reloadConfiguration(); + + try + { + allState.pallier.encrypt(allState.m1, allState.r1); + } catch (PIRException e) + { + logger.info("Exception in testWithGMPConstantTime!\n"); + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void testWithoutGMP(PaillierBenchmarkState allState) + { + SystemConfiguration.setProperty("paillier.useGMPForModPow", "false"); + ModPowAbstraction.reloadConfiguration(); + + try + { + allState.pallier.encrypt(allState.m1, allState.r1); + } catch (PIRException e) + { + logger.info("Exception in testWithoutGMP!\n"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java new file mode 100644 index 0000000..1535e1f --- /dev/null +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.distributed; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A container for Apache's Command Line Interface that contains custom functionality for the MapReduce functional tests. + */ +public class DistributedTestCLI +{ + private static final Logger logger = LoggerFactory.getLogger(DistributedTestCLI.class); + + private CommandLine commandLine = null; + private Options cliOptions = null; + + /** + * Create and parse allowable options + * + * @param args + * - arguments fed into the main method + */ + public DistributedTestCLI(String[] args) + { + // create the command line options + cliOptions = createOptions(); + + try + { + // parse the command line options + CommandLineParser parser = new GnuParser(); + commandLine = parser.parse(cliOptions, args, true); + + // if help option is selected, just print help text and exit + if (hasOption("h")) + { + printHelp(); + System.exit(1); + } + + // The full path of the jar file must be set + if (!hasOption("j")) + { + logger.info("The full path of the jar file must be set with -j"); + System.exit(1); + } + } catch (Exception e) + { + e.printStackTrace(); + System.exit(1); + } + } + + /** + * Determine if an option was provided by the user via the CLI + * + * @param option + * - the option of interest + * @return true if option was provided, false otherwise + */ + public boolean hasOption(String option) + { + return commandLine.hasOption(option); + } + + /** + * Obtain the argument of the option provided by the user via the CLI + * + * @param option + * - the option of interest + * @return value of the argument of the option + */ + public String getOptionValue(String option) + { + return commandLine.getOptionValue(option); + } + + /** + * Determine if the argument was provided, which determines if a test should or should not be run + * + * @param allowed + * - argument string you are looking for + * @return true if argument was provided via the CLI, false otherwise + */ + public boolean run(String allowed) + { + return run(allowed, "t"); + } + + /** + * Determine if the argument was provided for the selected option, which determines if a test should or should not be run + * + * @param allowed + * - argument string you are looking for + * @param option + * - the option of interest + * @return true if argument was provided via the CLI, false otherwise + */ + public boolean run(String allowed, String option) + { + if (!hasOption(option)) + { + return true; + } + + String selection = getOptionValue(option); + String[] selectionList = selection.split(","); + + for (String selectionItem : selectionList) + { + if (selectionItem.equals(allowed)) + { + return true; + } + } + + return false; + } + + /** + * Create the options available for the DistributedTestDriver + * + * @return Apache's CLI Options object + */ + private Options createOptions() + { + Options options = new Options(); + + // help + Option optionHelp = new Option("h", "help", false, "Print out the help documentation for this command line execution"); + optionHelp.setRequired(false); + options.addOption(optionHelp); + + // jar file + Option optionJar = new Option("j", "jar", true, "required -- Fully qualified jar file"); + optionJar.setRequired(false); + options.addOption(optionJar); + + // test selection + String tests = "testNum = 1: Wideskies Tests\n"; + tests += "Subtests:\n"; + tests += "E - Elasticsearch MapReduce\n"; + tests += "J - JSON/HDFS MapReduce\n"; + tests += "ES - Elasticsearch Spark \n"; + tests += "JS - JSON/HDFS Spark \n"; + + Option optionTestSelection = new Option("t", "tests", true, "optional -- Select which tests to execute: \n" + tests); + optionTestSelection.setRequired(false); + optionTestSelection.setArgName("<testNum>:<subtestDesignator>"); + optionTestSelection.setType(String.class); + options.addOption(optionTestSelection); + + return options; + } + + /** + * Prints out the help message + */ + private void printHelp() + { + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(140); + formatter.printHelp("DistributedTestDriver", cliOptions); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java new file mode 100755 index 0000000..ee37e63 --- /dev/null +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.distributed; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.test.distributed.testsuite.DistTestSuite; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.utils.SystemConfiguration; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Driver class to run the suite of functional tests for MR and Spark PIR + * + */ +public class DistributedTestDriver +{ + private static final Logger logger = LoggerFactory.getLogger(DistributedTestDriver.class); + + // Input + public static final String JSON_PIR_INPUT_FILE_PROPERTY = "test.pir.inputJSONFile"; + public static final String ES_PIR_INPUT_INDEX_PROPERTY = "test.pir.es.index"; + public static final String PIR_QUERY_INPUT_DIR = "test.pir.queryInputDir"; + public static final String PIR_STOPLIST_FILE = "test.pir.stopListFile"; + public static final String ES_PIR_INPUT_RESOURCE_PROPERTY = "test.pir.es.resource"; + + // Elastic Search + public static final String ES_INPUT_NODES_PROPERTY = "es.nodes"; + public static final String ES_INPUT_PORT_PROPERTY = "es.port"; + public static final String ES_INPUT_INDEX_PROPERTY = "test.es.index"; + public static final String ES_INPUT_TYPE_PROPERTY = "test.es.type"; + public static final String ES_INPUT_RESOURCE_PROPERTY = "test.es.resource"; + + // Output + public static final String OUTPUT_DIRECTORY_PROPERTY = "test.outputHDFSFile"; + + public static void main(String[] args) throws Exception + { + // create a cli object to handle all program inputs + DistributedTestCLI cli = new DistributedTestCLI(args); + + logger.info("DistributedTest Suite Beginning"); + FileSystem fs = FileSystem.get(new Configuration()); + + String jarFile = cli.getOptionValue("j"); + logger.info("jarFile = " + jarFile); + SystemConfiguration.setProperty("jarFile", jarFile); + + List<JSONObject> dataElements = initialize(fs); + + // Pull off the properties and reset upon completion + String dataSchemasProp = SystemConfiguration.getProperty("data.schemas", "none"); + String querySchemasProp = SystemConfiguration.getProperty("query.schemas", "none"); + String stopListFileProp = SystemConfiguration.getProperty("pir.stopListFile"); + + test(fs, cli, dataElements); + + cleanup(fs, dataSchemasProp, querySchemasProp, stopListFileProp); + logger.info("Distributed Test Suite Complete"); + } + + /** + * Create all inputs + */ + public static List<JSONObject> initialize(FileSystem fs) throws Exception + { + List<JSONObject> dataElements = Inputs.createPIRJSONInput(fs); + + String localStopListFile = Inputs.createPIRStopList(fs, true); + SystemConfiguration.setProperty("pir.stopListFile", localStopListFile); + + Inputs.createSchemaFiles(fs, true, StopListFilter.class.getName()); + + return dataElements; + } + + /** + * Execute Tests + */ + public static void test(FileSystem fs, DistributedTestCLI cli, List<JSONObject> pirDataElements) throws Exception + { + if (cli.run("1:J")) + { + DistTestSuite.testJSONInputMR(fs, pirDataElements); + } + if (cli.run("1:E") || cli.run("1:ES")) + { + Inputs.createPIRESInput(); + if (cli.run("1:E")) + { + DistTestSuite.testESInputMR(fs, pirDataElements); + } + if (cli.run("1:ES")) + { + DistTestSuite.testESInputSpark(fs, pirDataElements); + } + } + if (cli.run("1:JS")) + { + DistTestSuite.testJSONInputSpark(fs, pirDataElements); + } + } + + /** + * Delete all necessary inputs, clean up + */ + public static void cleanup(FileSystem fs, String dataSchemasProp, String querySchemasProp, String stopListProp) throws Exception + { + Inputs.deleteESInput(); + fs.close(); + + SystemConfiguration.setProperty("pir.stopListFile", stopListProp); + + // Force the query and data schemas to load their original values + if (!dataSchemasProp.equals("none")) + { + DataSchemaLoader.initialize(); + } + + if (!querySchemasProp.equals("none")) + { + QuerySchemaLoader.initialize(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java new file mode 100644 index 0000000..58f835c --- /dev/null +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.distributed.testsuite; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.inputformat.hadoop.InputFormatConst; +import org.apache.pirk.inputformat.hadoop.json.JSONInputFormatBase; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.ResponderProps; +import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.test.distributed.DistributedTestDriver; +import org.apache.pirk.test.utils.BaseTests; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.spark.launcher.SparkLauncher; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Distributed test class for PIR + * + */ +public class DistTestSuite +{ + private static final Logger logger = LoggerFactory.getLogger(DistTestSuite.class); + + // This method also tests all non-query specific configuration options/properties + // for the MapReduce version of PIR + public static void testJSONInputMR(FileSystem fs, List<JSONObject> dataElements) throws Exception + { + logger.info("Starting testJSONInputMR"); + + // Pull original data and query schema properties + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "100"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + // Set up base configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT); + SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY)); + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0"); + + // Run tests + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + + BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2); + + // Test hit limits per selector + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 3); + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + // Test the local cache for modular exponentiation + SystemConfiguration.setProperty("pir.useLocalCache", "true"); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2); + SystemConfiguration.setProperty("pir.useLocalCache", "false"); + + // Change query for NXDOMAIN + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:3"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, false, true, 2); + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, false, true, 2); + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0"); + + // Test the expTable cases + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + + // In memory table + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "true"); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + + // Create exp table in hdfs + SystemConfiguration.setProperty("mapreduce.map.memory.mb", "10000"); + SystemConfiguration.setProperty("mapreduce.reduce.memory.mb", "10000"); + SystemConfiguration.setProperty("mapreduce.map.java.opts", "-Xmx9000m"); + SystemConfiguration.setProperty("mapreduce.reduce.java.opts", "-Xmx9000m"); + + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + SystemConfiguration.setProperty("pir.expCreationSplits", "50"); + SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150"); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + + // Reset exp properties + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + + // Reset property + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + + // Test embedded QuerySchema + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + + logger.info("Completed testJSONInputMR"); + } + + public static void testESInputMR(FileSystem fs, List<JSONObject> dataElements) throws Exception + { + logger.info("Starting testESInputMR"); + + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + // Set up ES configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES); + SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0"); + SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_RESOURCE_PROPERTY)); + + // Run tests + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); + BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1); + + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + + // Change query for NXDOMAIN + SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3"); + + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, false, true, 3); + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, false, true, 3); + + logger.info("Completed testESInputMR"); + } + + public static void testJSONInputSpark(FileSystem fs, List<JSONObject> dataElements) throws Exception + { + logger.info("Starting testJSONInputSpark"); + + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + SystemConfiguration.setProperty("pir.useModExpJoin", "false"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + SystemConfiguration.setProperty("pir.numColMultPartitions", "20"); + SystemConfiguration.setProperty("pir.colMultReduceByKey", "false"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + // Set up JSON configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT); + SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY)); + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0"); + + // Run tests + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1); + + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); + BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2); + + BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2); + + // Test embedded QuerySchema + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + // Test pad columns + SystemConfiguration.setProperty("pir.padEmptyColumns", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + SystemConfiguration.setProperty("pir.padEmptyColumns", "false"); + + // Test hit limits per selector + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 3); + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + // Test the local cache for modular exponentiation + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + SystemConfiguration.setProperty("pir.useLocalCache", "true"); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3); + + // Test the join functionality for the modular exponentiation table + SystemConfiguration.setProperty("pir.useModExpJoin", "true"); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3); + SystemConfiguration.setProperty("pir.useModExpJoin", "false"); + + // Test file based exp lookup table for modular exponentiation + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true"); + SystemConfiguration.setProperty("pir.expCreationSplits", "500"); + SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150"); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + + // Change query for NXDOMAIN + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:3"); + + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, true, true, 3); + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, true, true, 3); + + // Test with reduceByKey for column mult + SystemConfiguration.setProperty("pir.colMultReduceByKey", "true"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, true, true, 3); + + logger.info("Completed testJSONInputSpark"); + } + + public static void testESInputSpark(FileSystem fs, List<JSONObject> dataElements) throws Exception + { + logger.info("Starting testESInputSpark"); + + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + // Set up ES configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES); + SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0"); + SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_PIR_INPUT_RESOURCE_PROPERTY)); + + // Run tests + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1); + BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2); + + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); + + // Change query for NXDOMAIN + SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3"); + + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, true, true, 3); + SystemConfiguration.setProperty("pirTest.embedSelector", "false"); + BaseTests.testDNSNXDOMAINQuery(dataElements, fs, true, true, 3); + + logger.info("Completed testESInputSpark"); + } + + // Base method to perform query + public static List<QueryResponseJSON> performQuery(String queryType, ArrayList<String> selectors, FileSystem fs, boolean isSpark, int numThreads) + throws Exception + { + logger.info("performQuery: "); + + String queryInputDir = SystemConfiguration.getProperty(DistributedTestDriver.PIR_QUERY_INPUT_DIR); + String outputFile = SystemConfiguration.getProperty(DistributedTestDriver.OUTPUT_DIRECTORY_PROPERTY); + fs.delete(new Path(outputFile), true); // Ensure old output does not exist. + + SystemConfiguration.setProperty("pir.queryInput", queryInputDir); + SystemConfiguration.setProperty("pir.outputFile", outputFile); + SystemConfiguration.setProperty("pir.numReduceTasks", "1"); + SystemConfiguration.setProperty("pir.stopListFile", SystemConfiguration.getProperty(DistributedTestDriver.PIR_STOPLIST_FILE)); + + // Create the temp result file + File fileFinalResults = File.createTempFile("finalResultsFile", ".txt"); + fileFinalResults.deleteOnExit(); + logger.info("fileFinalResults = " + fileFinalResults.getAbsolutePath()); + + boolean embedSelector = SystemConfiguration.getBooleanProperty("pirTest.embedSelector", false); + boolean useExpLookupTable = SystemConfiguration.getBooleanProperty("pirTest.useExpLookupTable", false); + boolean useHDFSExpLookupTable = SystemConfiguration.getBooleanProperty("pirTest.useHDFSExpLookupTable", false); + + // Set the necessary objects + QueryInfo queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, + queryType, useExpLookupTable, embedSelector, useHDFSExpLookupTable); + + Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); + + // Perform the encryption + logger.info("Performing encryption of the selectors - forming encrypted query vectors:"); + EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier); + encryptQuery.encrypt(numThreads); + logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); + + // Grab the necessary objects + Querier querier = encryptQuery.getQuerier(); + Query query = encryptQuery.getQuery(); + + // Write the Querier object to a file + Path queryInputDirPath = new Path(queryInputDir); + new HadoopFileSystemStore(fs).store(queryInputDirPath, query); + fs.deleteOnExit(queryInputDirPath); + + // Grab the original data and query schema properties to reset upon completion + String dataSchemaProp = SystemConfiguration.getProperty("data.schemas"); + String querySchemaProp = SystemConfiguration.getProperty("query.schemas"); + + // Get the correct input format class name + JSONInputFormatBase jFormat = new JSONInputFormatBase(); + String jsonBaseInputFormatString = jFormat.getClass().getName(); + SystemConfiguration.setProperty("pir.baseInputFormat", jsonBaseInputFormatString); + + // Submitting the tool for encrypted query + logger.info("Performing encrypted query:"); + if (isSpark) + { + // Build args + String inputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); + logger.info("inputFormat = " + inputFormat); + ArrayList<String> args = new ArrayList<>(); + args.add("-" + ResponderProps.PLATFORM + "=spark"); + args.add("-" + ResponderProps.DATAINPUTFORMAT + "=" + inputFormat); + args.add("-" + ResponderProps.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput")); + args.add("-" + ResponderProps.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile")); + args.add("-" + ResponderProps.STOPLISTFILE + "=" + SystemConfiguration.getProperty("pir.stopListFile")); + args.add("-" + ResponderProps.USELOCALCACHE + "=" + SystemConfiguration.getProperty("pir.useLocalCache", "true")); + args.add("-" + ResponderProps.LIMITHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.limitHitsPerSelector", "false")); + args.add("-" + ResponderProps.MAXHITSPERSELECTOR + "=" + SystemConfiguration.getProperty("pir.maxHitsPerSelector", "1000")); + args.add("-" + ResponderProps.QUERYSCHEMAS + "=" + Inputs.HDFS_QUERY_FILES); + args.add("-" + ResponderProps.DATASCHEMAS + "=" + Inputs.DATA_SCHEMA_FILE_HDFS); + args.add("-" + ResponderProps.NUMEXPLOOKUPPARTS + "=" + SystemConfiguration.getProperty("pir.numExpLookupPartitions", "100")); + args.add("-" + ResponderProps.USEMODEXPJOIN + "=" + SystemConfiguration.getProperty("pir.useModExpJoin", "false")); + args.add("-" + ResponderProps.NUMCOLMULTPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numColMultPartitions", "20")); + args.add("-" + ResponderProps.COLMULTREDUCEBYKEY + "=" + SystemConfiguration.getProperty("pir.colMultReduceByKey", "false")); + if (inputFormat.equals(InputFormatConst.BASE_FORMAT)) + { + args.add("-" + ResponderProps.INPUTDATA + "=" + SystemConfiguration.getProperty("pir.inputData")); + args.add("-" + ResponderProps.BASEQUERY + "=" + SystemConfiguration.getProperty("pir.baseQuery")); + args.add("-" + ResponderProps.BASEINPUTFORMAT + "=" + SystemConfiguration.getProperty("pir.baseInputFormat")); + } + else if (inputFormat.equals(InputFormatConst.ES)) + { + args.add("-" + ResponderProps.ESQUERY + "=" + SystemConfiguration.getProperty("pir.esQuery")); + args.add("-" + ResponderProps.ESRESOURCE + "=" + SystemConfiguration.getProperty("pir.esResource")); + } + + for (String arg : args) + { + logger.info("arg = " + arg); + } + + // Run spark application + Process sLauncher = new SparkLauncher().setAppResource(SystemConfiguration.getProperty("jarFile")) + .setSparkHome(SystemConfiguration.getProperty("spark.home")).setMainClass("org.apache.pirk.responder.wideskies.ResponderDriver") + .addAppArgs(args.toArray(new String[args.size()])).setMaster("yarn-cluster").setConf(SparkLauncher.EXECUTOR_MEMORY, "2g") + .setConf(SparkLauncher.DRIVER_MEMORY, "2g").setConf(SparkLauncher.EXECUTOR_CORES, "1").launch(); + sLauncher.waitFor(); + } + else + { + SystemConfiguration.setProperty("data.schemas", Inputs.DATA_SCHEMA_FILE_HDFS); + SystemConfiguration.setProperty("query.schemas", Inputs.HDFS_QUERY_FILES); + + ComputeResponseTool responseTool = new ComputeResponseTool(); + ToolRunner.run(responseTool, new String[] {}); + } + logger.info("Completed encrypted query"); + + // Perform decryption + // Reconstruct the necessary objects from the files + logger.info("Performing decryption; writing final results file"); + Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class); + + // Perform decryption and output the result file + DecryptResponse decryptResponse = new DecryptResponse(response, querier); + decryptResponse.decrypt(numThreads); + decryptResponse.writeResultFile(fileFinalResults); + logger.info("Completed performing decryption and writing final results file"); + + // Read in results + logger.info("Reading in and checking results"); + List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults); + + // Reset data and query schema properties + SystemConfiguration.setProperty("data.schemas", dataSchemaProp); + SystemConfiguration.setProperty("query.schemas", querySchemaProp); + + // Clean up output dir in hdfs + fs.delete(new Path(outputFile), true); + + return results; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/9244df72/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java new file mode 100644 index 0000000..a55ed4d --- /dev/null +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.test.utils; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.query.wideskies.QueryUtils; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.test.distributed.testsuite.DistTestSuite; +import org.apache.pirk.utils.StringUtils; +import org.apache.pirk.utils.SystemConfiguration; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.fail; + +/** + * Class to hold the base functional distributed tests + */ +public class BaseTests +{ + private static final Logger logger = LoggerFactory.getLogger(BaseTests.class); + + public static final UUID queryIdentifier = UUID.randomUUID(); + public static final int dataPartitionBitSize = 8; + + // Selectors for domain and IP queries, queryIdentifier is the first entry for file generation + private static ArrayList<String> selectorsDomain = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); + private static ArrayList<String> selectorsIP = new ArrayList<>(Arrays.asList("55.55.55.55", "5.6.7.8", "10.20.30.40", "13.14.15.16", "21.22.23.24")); + + // Encryption variables -- Paillier mechanisms are tested in the Paillier test code, so these are fixed... + public static final int hashBitSize = 12; + public static final String hashKey = "someKey"; + public static final int paillierBitSize = 384; + public static final int certainty = 128; + + public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, int numThreads, boolean testFalsePositive) throws Exception + { + testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive); + } + + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false); + } + + // Query for the watched hostname occurred; ; watched value type: hostname (String) + public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, + boolean testFalsePositive) throws Exception + { + logger.info("Running testDNSHostnameQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY); + + int numExpectedResults = 6; + List<QueryResponseJSON> results; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, numThreads, testFalsePositive); + if (!testFalsePositive) + { + numExpectedResults = 7; // all 7 for non distributed case; if testFalsePositive==true, then 6 + } + } + logger.info("results:"); + printResultList(results); + + if (isDistributed && SystemConfiguration.isSetTrue("pir.limitHitsPerSelector")) + { + // 3 elements returned - one for each qname -- a.b.c.com, d.e.com, something.else + if (results.size() != 3) + { + fail("results.size() = " + results.size() + " -- must equal 3"); + } + + // Check that each qname appears once in the result set + HashSet<String> correctQnames = new HashSet<>(); + correctQnames.add("a.b.c.com"); + correctQnames.add("d.e.com"); + correctQnames.add("something.else"); + + HashSet<String> resultQnames = new HashSet<>(); + for (QueryResponseJSON qrJSON : results) + { + resultQnames.add((String) qrJSON.getValue(Inputs.QNAME)); + } + + if (correctQnames.size() != resultQnames.size()) + { + fail("correctQnames.size() = " + correctQnames.size() + " != resultQnames.size() " + resultQnames.size()); + } + + for (String resultQname : resultQnames) + { + if (!correctQnames.contains(resultQname)) + { + fail("correctQnames does not contain resultQname = " + resultQname); + } + } + } + else + { + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + // Number of original elements at the end of the list that we do not need to consider for hits + int removeTailElements = 2; // the last two data elements should not hit + if (testFalsePositive) + { + removeTailElements = 3; + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - removeTailElements)) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = true; + if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3")) + { + addElement = false; + } + if (addElement) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier.toString()); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_HOSTNAME_QUERY); + wlJSON.setMapping(Inputs.DATE, dataMap.get(Inputs.DATE)); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this gets re-embedded as the original selector after decryption + wlJSON.setMapping(Inputs.QTYPE, parseShortArray(dataMap, Inputs.QTYPE)); + wlJSON.setMapping(Inputs.RCODE, dataMap.get(Inputs.RCODE)); + wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + logger.info("correctResults: "); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + } + logger.info("Completed testDNSHostnameQuery(): "); + } + + public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testDNSIPQuery(dataElements, null, false, false, numThreads); + } + + // The watched IP address was detected in the response to a query; watched value type: IP address (String) + public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + { + logger.info("Running testDNSIPQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_IP_QUERY); + List<QueryResponseJSON> results; + + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads); + + if (results.size() != 5) + { + fail("results.size() = " + results.size() + " -- must equal 5"); + } + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_IP_QUERY, selectorsIP, numThreads, false); + + if (results.size() != 6) + { + fail("results.size() = " + results.size() + " -- must equal 6"); + } + } + printResultList(results); + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - 3)) // last three data elements not hit - one on stoplist, two don't match selectors + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = true; + if (isDistributed && dataMap.get(Inputs.RCODE).toString().equals("3")) + { + addElement = false; + } + if (addElement) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_IP_QUERY); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testDNSIPQuery(): "); + } + + public static void testDNSNXDOMAINQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testDNSNXDOMAINQuery(dataElements, null, false, false, numThreads); + } + + // A query that returned an nxdomain response was made for the watched hostname; watched value type: hostname (String) + public static void testDNSNXDOMAINQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + logger.info("Running testDNSNXDOMAINQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_NXDOMAIN_QUERY); + List<QueryResponseJSON> results; + + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, numThreads, false); + } + printResultList(results); + + if (results.size() != 1) + { + fail("results.size() = " + results.size() + " -- must equal 1"); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < dataElements.size()) + { + JSONObject dataMap = dataElements.get(i); + + if (dataMap.get(Inputs.RCODE).toString().equals("3")) + { + QueryResponseJSON wlJSON = new QueryResponseJSON(); + wlJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + wlJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_NXDOMAIN_QUERY); + wlJSON.setMapping(Inputs.QNAME, dataMap.get(Inputs.QNAME)); // this gets re-embedded as the original selector after decryption + wlJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + wlJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + wlJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(wlJSON); + } + ++i; + } + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testDNSNXDOMAINQuery(): "); + } + + public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception + { + testSRCIPQuery(dataElements, null, false, false, numThreads); + } + + // Query for responses from watched srcIPs + public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + { + logger.info("Running testSRCIPQuery(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY); + List<QueryResponseJSON> results; + + int removeTailElements = 0; + int numExpectedResults = 1; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads); + removeTailElements = 2; // The last two elements are on the distributed stoplist + } + else + { + numExpectedResults = 3; + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_SRCIP_QUERY, selectorsIP, numThreads, false); + } + printResultList(results); + + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < (dataElements.size() - removeTailElements)) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = false; + if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8")) + { + addElement = true; + } + if (addElement) + { + // Form the correct result QueryResponseJSON object + QueryResponseJSON qrJSON = new QueryResponseJSON(); + qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_SRCIP_QUERY); + qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME)); + qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(qrJSON); + } + ++i; + } + logger.info("correctResults:"); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testSRCIPQuery(): "); + } + + // Query for responses from watched srcIPs + public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) + throws Exception + { + logger.info("Running testSRCIPQueryNoFilter(): "); + + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_SRCIP_QUERY_NO_FILTER); + List<QueryResponseJSON> results; + + int numExpectedResults = 3; + if (isDistributed) + { + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads); + } + else + { + results = StandaloneQuery.performStandaloneQuery(dataElements, Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, numThreads, false); + } + printResultList(results); + + if (results.size() != numExpectedResults) + { + fail("results.size() = " + results.size() + " -- must equal " + numExpectedResults); + } + + ArrayList<QueryResponseJSON> correctResults = new ArrayList<>(); + int i = 0; + while (i < dataElements.size()) + { + JSONObject dataMap = dataElements.get(i); + + boolean addElement = false; + if (dataMap.get(Inputs.SRCIP).toString().equals("55.55.55.55") || dataMap.get(Inputs.SRCIP).toString().equals("5.6.7.8")) + { + addElement = true; + } + if (addElement) + { + // Form the correct result QueryResponseJSON object + QueryResponseJSON qrJSON = new QueryResponseJSON(); + qrJSON.setMapping(QueryResponseJSON.QUERY_ID, queryIdentifier); + qrJSON.setMapping(QueryResponseJSON.EVENT_TYPE, Inputs.DNS_SRCIP_QUERY_NO_FILTER); + qrJSON.setMapping(Inputs.QNAME, parseString(dataMap, Inputs.QNAME)); + qrJSON.setMapping(Inputs.DSTIP, dataMap.get(Inputs.DSTIP)); + qrJSON.setMapping(Inputs.SRCIP, dataMap.get(Inputs.SRCIP)); + qrJSON.setMapping(Inputs.IPS, parseArray(dataMap, Inputs.IPS, true)); + qrJSON.setMapping(QueryResponseJSON.SELECTOR, QueryUtils.getSelectorByQueryTypeJSON(qSchema, dataMap)); + correctResults.add(qrJSON); + } + ++i; + } + logger.info("correctResults:"); + printResultList(correctResults); + + if (results.size() != correctResults.size()) + { + logger.info("correctResults:"); + printResultList(correctResults); + fail("results.size() = " + results.size() + " != correctResults.size() = " + correctResults.size()); + } + for (QueryResponseJSON result : results) + { + if (!compareResultArray(correctResults, result)) + { + fail("correctResults does not contain result = " + result.toString()); + } + } + logger.info("Completed testSRCIPQueryNoFilter(): "); + } + + @SuppressWarnings("unchecked") + // Method to convert a ArrayList<String> into the correct (padded) returned ArrayList + private static ArrayList<String> parseArray(JSONObject dataMap, String fieldName, boolean isIP) + { + ArrayList<String> retArray = new ArrayList<>(); + + ArrayList<String> values; + if (dataMap.get(fieldName) instanceof ArrayList) + { + values = (ArrayList<String>) dataMap.get(fieldName); + } + else + { + values = StringUtils.jsonArrayStringToArrayList((String) dataMap.get(fieldName)); + } + + int numArrayElementsToReturn = SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1); + for (int i = 0; i < numArrayElementsToReturn; ++i) + { + if (i < values.size()) + { + retArray.add(values.get(i)); + } + else if (isIP) + { + retArray.add("0.0.0.0"); + } + else + { + retArray.add("0"); + } + } + + return retArray; + } + + // Method to convert a ArrayList<Short> into the correct (padded) returned ArrayList + private static ArrayList<Short> parseShortArray(JSONObject dataMap, String fieldName) + { + ArrayList<Short> retArray = new ArrayList<>(); + + ArrayList<Short> values = (ArrayList<Short>) dataMap.get(fieldName); + + int numArrayElementsToReturn = SystemConfiguration.getIntProperty("pir.numReturnArrayElements", 1); + for (int i = 0; i < numArrayElementsToReturn; ++i) + { + if (i < values.size()) + { + retArray.add(values.get(i)); + } + else + { + retArray.add((short) 0); + } + } + + return retArray; + } + + // Method to convert the String field value to the correct returned substring + private static String parseString(JSONObject dataMap, String fieldName) + { + String ret; + + String element = (String) dataMap.get(fieldName); + int numParts = Integer.parseInt(SystemConfiguration.getProperty("pir.stringBits")) / dataPartitionBitSize; + int len = numParts; + if (element.length() < numParts) + { + len = element.length(); + } + ret = new String(element.getBytes(), 0, len); + + return ret; + } + + // Method to determine whether or not the correctResults contains an object equivalent to + // the given result + private static boolean compareResultArray(ArrayList<QueryResponseJSON> correctResults, QueryResponseJSON result) + { + boolean equivalent = false; + + for (QueryResponseJSON correct : correctResults) + { + equivalent = compareResults(correct, result); + if (equivalent) + { + break; + } + } + + return equivalent; + } + + // Method to test the equivalence of two test results + private static boolean compareResults(QueryResponseJSON r1, QueryResponseJSON r2) + { + boolean equivalent = true; + + JSONObject jsonR1 = r1.getJSONObject(); + JSONObject jsonR2 = r2.getJSONObject(); + + Set<String> r1KeySet = jsonR1.keySet(); + Set<String> r2KeySet = jsonR2.keySet(); + if (!r1KeySet.equals(r2KeySet)) + { + equivalent = false; + } + if (equivalent) + { + for (String key : r1KeySet) + { + if (key.equals(Inputs.QTYPE) || key.equals(Inputs.IPS)) // array types + { + HashSet<String> set1 = getSetFromList(jsonR1.get(key)); + HashSet<String> set2 = getSetFromList(jsonR2.get(key)); + + if (!set1.equals(set2)) + { + equivalent = false; + } + } + else + { + if (!(jsonR1.get(key).toString()).equals(jsonR2.get(key).toString())) + { + equivalent = false; + } + } + } + } + return equivalent; + } + + // Method to pull the elements of a list (either an ArrayList or JSONArray) into a HashSet + private static HashSet<String> getSetFromList(Object list) + { + HashSet<String> set = new HashSet<>(); + + if (list instanceof ArrayList) + { + for (Object obj : (ArrayList) list) + { + set.add(obj.toString()); + } + } + else + // JSONArray + { + for (Object obj : (JSONArray) list) + { + set.add(obj.toString()); + } + } + + return set; + } + + private static void printResultList(List<QueryResponseJSON> list) + { + for (QueryResponseJSON obj : list) + { + logger.info(obj.toString()); + } + } +}
