Repository: incubator-pirk Updated Branches: refs/heads/master 45b2da870 -> 5cc5e7c24
Fix deprecated Api calls and other minor fixes - closes apache/incubator-pirk#84 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/5cc5e7c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/5cc5e7c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/5cc5e7c2 Branch: refs/heads/master Commit: 5cc5e7c24728695d4c2cdc8bd88555c52e60887c Parents: 45b2da8 Author: smarthi <[email protected]> Authored: Fri Sep 2 15:06:00 2016 -0400 Committer: eawilliams <[email protected]> Committed: Fri Sep 2 15:06:00 2016 -0400 ---------------------------------------------------------------------- pom.xml | 2 +- .../responder/wideskies/ResponderDriver.java | 4 +- .../responder/wideskies/ResponderProps.java | 70 ++++++++++--------- .../wideskies/common/ComputeEncryptedRow.java | 10 +-- .../mapreduce/ComputeResponseTool.java | 12 ++-- .../wideskies/spark/ComputeResponse.java | 14 ++-- .../spark/EncColMultGroupedMapper.java | 2 +- .../wideskies/spark/EncColMultReducer.java | 2 +- .../wideskies/spark/ExpKeyFilenameMap.java | 3 +- .../responder/wideskies/spark/FilterData.java | 2 +- .../streaming/ComputeStreamingResponse.java | 71 +++++++++----------- .../wideskies/storm/EncColMultBolt.java | 18 +++-- .../wideskies/storm/EncRowCalcBolt.java | 26 +++---- .../responder/wideskies/storm/OutputBolt.java | 42 ++++++------ .../wideskies/storm/PartitionDataBolt.java | 13 +--- .../wideskies/storm/PirkHashScheme.java | 15 ++--- .../responder/wideskies/storm/PirkTopology.java | 10 +-- .../wideskies/storm/StormConstants.java | 60 ++++++++--------- .../responder/wideskies/storm/StormUtils.java | 18 ++--- .../pirk/schema/data/DataSchemaLoader.java | 2 - .../org/apache/pirk/test/utils/TestUtils.java | 8 +-- .../pirk/storm/KafkaStormIntegrationTest.java | 50 +++++++------- 22 files changed, 212 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 51e16b9..3119282 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ <hadoop.version>2.7.2</hadoop.version> <spark.version>1.6.1</spark.version> <apache-commons.version>3.3</apache-commons.version> - <elasticsearch.version>2.3.3</elasticsearch.version> + <elasticsearch.version>2.3.4</elasticsearch.version> <storm.version>1.0.1</storm.version> <kafka.version>0.9.0.1</kafka.version> <spark-streaming.version>2.0.0</spark-streaming.version> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index f8e396b..6f34de5 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -50,9 +50,9 @@ public class ResponderDriver { private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); - enum Platform + private enum Platform { - MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE; + MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE } public static void main(String[] args) throws Exception http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java index 55846fd..f73fdbe 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -21,9 +21,7 @@ package org.apache.pirk.responder.wideskies; import java.util.Arrays; import java.util.List; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.cli.Option; import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.query.QuerySchemaLoader; @@ -62,14 +60,14 @@ public class ResponderProps public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions"; public static final String USEMODEXPJOIN = "pir.useModExpJoin"; public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey"; - public static final String NUMREDUCETASKS = "pir.numReduceTasks"; - public static final String MAPMEMORY = "mapreduce.map.memory.mb"; - public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; - public static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; - public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; - public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; + static final String NUMREDUCETASKS = "pir.numReduceTasks"; + static final String MAPMEMORY = "mapreduce.map.memory.mb"; + static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; + static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; + static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; + static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; public static final String NUMDATAPARTITIONS = "pir.numDataPartitions"; - public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; + static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; // For Spark Streaming - optional public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds"; @@ -80,35 +78,35 @@ public class ResponderProps // Storm parameters // hdfs - static final String HDFSURI = "hdfs.uri"; - static final String USEHDFS = "hdfs.use"; + private static final String HDFSURI = "hdfs.uri"; + private static final String USEHDFS = "hdfs.use"; // kafka - static final String KAFKATOPIC = "kafka.topic"; - static final String KAFKACLIENTID = "kafka.clientId"; - static final String KAFKAZK = "kafka.zk"; - static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart"; + private static final String KAFKATOPIC = "kafka.topic"; + private static final String KAFKACLIENTID = "kafka.clientId"; + private static final String KAFKAZK = "kafka.zk"; + private static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart"; // pirk topo - static final String STORMTOPONAME = "storm.topoName"; - static final String STORMWORKERS = "storm.workers"; - static final String STORMNUMACKERS = "storm.numAckers"; - static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize"; - static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize"; - static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize"; - static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending"; - static final String STORMHEAPMEMORY = "storm.worker.heapMemory"; - static final String STORMCHILDOPTS = "storm.worker.childOpts"; - static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory"; - static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem"; - static final String STORMSPOUTPAR = "storm.spout.parallelism"; - static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism"; - static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism"; - static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism"; - static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple"; - static final String STORMSPLITPARTITIONS = "storm.splitPartitions"; - static final String STORMSALTCOLUMNS = "storm.saltColumns"; - static final String STORMNUMROWDIVS = "storm.rowDivs"; - - static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS, + private static final String STORMTOPONAME = "storm.topoName"; + private static final String STORMWORKERS = "storm.workers"; + private static final String STORMNUMACKERS = "storm.numAckers"; + private static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize"; + private static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize"; + private static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize"; + private static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending"; + private static final String STORMHEAPMEMORY = "storm.worker.heapMemory"; + private static final String STORMCHILDOPTS = "storm.worker.childOpts"; + private static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory"; + private static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem"; + private static final String STORMSPOUTPAR = "storm.spout.parallelism"; + private static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism"; + private static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism"; + private static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism"; + private static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple"; + private static final String STORMSPLITPARTITIONS = "storm.splitPartitions"; + private static final String STORMSALTCOLUMNS = "storm.saltColumns"; + private static final String STORMNUMROWDIVS = "storm.rowDivs"; + + private static final String[] STORMPROPS = new String[] {HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS, STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP, STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, STORMSPLITPARTITIONS, STORMSALTCOLUMNS, STORMNUMROWDIVS}; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java index 0065094..f63cd08 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java @@ -345,10 +345,10 @@ public class ComputeEncryptedRow { BigInteger part = dataPartitions.get(i); - BigInteger exp = null; + BigInteger exp; try { - exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared())); + exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared())); } catch (ExecutionException e) { e.printStackTrace(); @@ -358,7 +358,7 @@ public class ComputeEncryptedRow logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2)); - returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp)); + returnPairs.add(new Tuple2<>(colCounter, exp)); ++colCounter; } @@ -380,13 +380,13 @@ public class ComputeEncryptedRow BigInteger exp = null; try { - exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared())); + exp = expCache.get(new Tuple3<>(rowQuery, part, query.getNSquared())); } catch (ExecutionException e) { e.printStackTrace(); } - returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp)); + returnPairs.add(new Tuple2<>(colCounter, exp)); ++colCounter; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 1d06f86..ab41a47 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -110,8 +110,6 @@ public class ComputeResponseTool extends Configured implements Tool private String esQuery = "none"; private String esResource = "none"; - String dataSchema = "none"; - private Configuration conf = null; private FileSystem fs = null; @@ -264,7 +262,7 @@ public class ComputeResponseTool extends Configured implements Tool // Run the job to generate the expTable // Job jobExp = new Job(mrConfig.getConfig(), "pirExp-" + pirWL.getWatchlistNum()); - Job jobExp = new Job(conf, "pirExp-" + queryInfo.getIdentifier()); + Job jobExp = Job.getInstance(conf, "pirExp-" + queryInfo.getIdentifier()); jobExp.setSpeculativeExecution(false); jobExp.getConfiguration().set("mapreduce.map.speculative", "false"); @@ -312,7 +310,7 @@ public class ComputeResponseTool extends Configured implements Tool // Assemble the exp table from the output // element_index -> fileName - HashMap<Integer,String> expFileTable = new HashMap<>(); + Map<Integer,String> expFileTable = new HashMap<>(); FileStatus[] status = fs.listStatus(outPathExp); for (FileStatus fstat : status) { @@ -352,7 +350,7 @@ public class ComputeResponseTool extends Configured implements Tool { boolean success; - Job job = new Job(conf, "pirMR"); + Job job = Job.getInstance(conf, "pirMR"); job.setSpeculativeExecution(false); // Set the data and query schema properties @@ -445,7 +443,7 @@ public class ComputeResponseTool extends Configured implements Tool { boolean success; - Job columnMultJob = new Job(conf, "pir_columnMult"); + Job columnMultJob = Job.getInstance(conf, "pir_columnMult"); columnMultJob.setSpeculativeExecution(false); String columnMultJobName = "pir_columnMult"; @@ -505,7 +503,7 @@ public class ComputeResponseTool extends Configured implements Tool { boolean success; - Job finalResponseJob = new Job(conf, "pir_finalResponse"); + Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse"); finalResponseJob.setSpeculativeExecution(false); String finalResponseJobName = "pir_finalResponse"; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index f2a54d7..d6593f7 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -249,7 +249,7 @@ public class ComputeResponse JavaRDD<MapWritable> jsonRDD; - Job job = new Job(); + Job job = Job.getInstance(); String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis(); job.setJobName(jobName); @@ -283,8 +283,7 @@ public class ComputeResponse // Filter out by the provided stopListFile entries if (qSchema.getFilter() != null) { - JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars)); - return filteredRDD; + return jsonRDD.filter(new FilterData(accum, bVars)); } else { @@ -303,7 +302,7 @@ public class ComputeResponse JavaRDD<MapWritable> jsonRDD; - Job job = new Job(); + Job job = Job.getInstance(); String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); job.setJobName(jobName); job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes")); @@ -316,8 +315,7 @@ public class ComputeResponse // Filter out by the provided stopListFile entries if (qSchema.getFilter() != null) { - JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars)); - return filteredRDD; + return jsonRDD.filter(new FilterData(accum, bVars)); } else { @@ -386,11 +384,11 @@ public class ComputeResponse JavaPairRDD<Long,BigInteger> encColRDD; if (colMultReduceByKey) { - encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions); + encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions); } else { - encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars)); + encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars)); } // Form the final response object http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java index 56c917c..cdab612 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java @@ -39,7 +39,7 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl Query query = null; - public EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn) + public EncColMultGroupedMapper(BroadcastVars bbVarsIn) { query = bbVarsIn.getQuery(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java index 9242df7..9bde1f7 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java @@ -37,7 +37,7 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt Query query = null; - public EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn) + public EncColMultReducer(BroadcastVars bbVarsIn) { query = bbVarsIn.getQuery(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java index cb95b5f..09b1e52 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java @@ -23,6 +23,7 @@ import java.io.OutputStreamWriter; import java.math.BigInteger; import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -53,7 +54,7 @@ public class ExpKeyFilenameMap implements PairFlatMapFunction<Iterator<Tuple2<In @Override public Iterable<Tuple2<Integer,String>> call(Iterator<Tuple2<Integer,Iterable<Tuple2<Integer,BigInteger>>>> iter) throws Exception { - ArrayList<Tuple2<Integer,String>> keyFileList = new ArrayList<>(); + List<Tuple2<Integer,String>> keyFileList = new ArrayList<>(); FileSystem fs = FileSystem.get(new Configuration()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java index 0a8959a..a84fd49 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java @@ -42,7 +42,7 @@ public class FilterData implements Function<MapWritable,Boolean> private DataSchema dSchema = null; private Object filter = null; - public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) throws Exception + public FilterData(Accumulators accumIn, BroadcastVars bbVarsIn) { accum = accumIn; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java index eaf7384..acb8682 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java @@ -18,6 +18,7 @@ */ package org.apache.pirk.responder.wideskies.spark.streaming; +import java.io.IOException; import java.math.BigInteger; import java.util.LinkedList; import java.util.List; @@ -46,7 +47,6 @@ import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.serialization.HadoopFileSystemStore; -import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -85,7 +85,7 @@ public class ComputeStreamingResponse private String outputDirExp = null; private String queryInput = null; - QuerySchema qSchema = null; + private QuerySchema qSchema = null; private String esQuery = "none"; private String esResource = "none"; @@ -94,17 +94,13 @@ public class ComputeStreamingResponse private HadoopFileSystemStore storage = null; private JavaStreamingContext jssc = null; - boolean useQueueStream = false; + private boolean useQueueStream = false; - private long batchSeconds = 0; private long windowLength = 0; private Accumulators accum = null; private BroadcastVars bVars = null; - private QueryInfo queryInfo = null; - Query query = null; - private int numDataPartitions = 0; private int numColMultPartitions = 0; @@ -154,7 +150,7 @@ public class ComputeStreamingResponse + " esResource = " + esResource); // Pull the batchSeconds and windowLength parameters - batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30); + long batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30); windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60); if (windowLength % batchSeconds != 0) { @@ -191,8 +187,8 @@ public class ComputeStreamingResponse bVars = new BroadcastVars(jssc.sparkContext()); // Set the Query and QueryInfo broadcast variables - query = storage.recall(queryInput, Query.class); - queryInfo = query.getQueryInfo(); + Query query = storage.recall(queryInput, Query.class); + QueryInfo queryInfo = query.getQueryInfo(); bVars.setQuery(query); bVars.setQueryInfo(queryInfo); @@ -258,7 +254,7 @@ public class ComputeStreamingResponse /** * Method to read in data from an allowed input source/format and perform the query */ - public void performQuery() throws Exception + public void performQuery() throws IOException, ClassNotFoundException, InterruptedException { logger.info("Performing query: "); @@ -279,11 +275,11 @@ public class ComputeStreamingResponse * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements */ @SuppressWarnings("unchecked") - public JavaDStream<MapWritable> readData() throws ClassNotFoundException, Exception + public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException { logger.info("Reading data "); - Job job = new Job(); + Job job = Job.getInstance(); String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis(); job.setJobName(jobName); @@ -298,7 +294,7 @@ public class ComputeStreamingResponse Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString); if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass)) { - throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat"); + throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat"); } job.setInputFormatClass(inputClass); @@ -306,10 +302,10 @@ public class ComputeStreamingResponse // Read data from hdfs logger.info("useQueueStream = " + useQueueStream); - JavaDStream<MapWritable> mwStream = null; + JavaDStream<MapWritable> mwStream; if (useQueueStream) { - Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>(); + Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>(); JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values() .coalesce(numDataPartitions); @@ -334,8 +330,7 @@ public class ComputeStreamingResponse // Filter out by the provided stopListFile entries if (qSchema.getFilter() != null) { - JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars)); - return filteredRDD; + return mwStream.filter(new FilterData(accum, bVars)); } return mwStream; @@ -345,11 +340,11 @@ public class ComputeStreamingResponse * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements */ @SuppressWarnings("unchecked") - public JavaDStream<MapWritable> readDataES() throws Exception + public JavaDStream<MapWritable> readDataES() throws IOException { logger.info("Reading data "); - Job job = new Job(); + Job job = Job.getInstance(); String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); job.setJobName(jobName); job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes")); @@ -358,10 +353,10 @@ public class ComputeStreamingResponse job.getConfiguration().set("es.query", esQuery); // Read data from hdfs - JavaDStream<MapWritable> mwStream = null; + JavaDStream<MapWritable> mwStream; if (useQueueStream) { - Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>(); + Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<>(); JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values() .coalesce(numDataPartitions); rddQueue.add(rddIn); @@ -386,8 +381,7 @@ public class ComputeStreamingResponse // Filter out by the provided stopListFile entries if (qSchema.getFilter() != null) { - JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars)); - return filteredRDD; + return mwStream.filter(new FilterData(accum, bVars)); } else { @@ -401,7 +395,7 @@ public class ComputeStreamingResponse * @throws InterruptedException * */ - public void performQuery(JavaDStream<MapWritable> input) throws PIRException, InterruptedException + public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException { logger.info("Performing query: "); @@ -430,38 +424,33 @@ public class ComputeStreamingResponse } // Method to compute the final encrypted columns - private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) throws PIRException + private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) { // Multiply the column values by colNum: emit <colNum, finalColVal> JavaPairDStream<Long,BigInteger> encColRDD; if (colMultReduceByKey) { - encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions); + encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(bVars), numColMultPartitions); } else { - encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars)); + encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(bVars)); } // Update the output name, by batch number bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue()); // Form and write the response object - encColRDD.repartition(1).foreachRDD(new VoidFunction<JavaPairRDD<Long,BigInteger>>() - { - @Override - public void call(JavaPairRDD<Long,BigInteger> rdd) - { - rdd.foreachPartition(new FinalResponseFunction(accum, bVars)); - - int maxBatchesVar = bVars.getMaxBatches(); - if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar) - { - logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down"); - System.exit(0); - } + encColRDD.repartition(1).foreachRDD((VoidFunction<JavaPairRDD<Long, BigInteger>>) rdd -> { + rdd.foreachPartition(new FinalResponseFunction(accum, bVars)); + int maxBatchesVar = bVars.getMaxBatches(); + if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar) + { + logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down"); + System.exit(0); } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java index 8831e4e..90375aa 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java @@ -19,6 +19,10 @@ package org.apache.pirk.responder.wideskies.storm; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; + import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -28,10 +32,6 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.LoggerFactory; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Map; - /** * Bolt class to perform encrypted column multiplication * <p> @@ -60,9 +60,7 @@ public class EncColMultBolt extends BaseRichBolt private Long totalFlushSignals; // This is the main object here. It holds column Id -> aggregated product - private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>(); - private BigInteger colVal1; - private BigInteger colMult; + private Map<Long,BigInteger> resultsMap = new HashMap<>(); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) @@ -93,7 +91,7 @@ public class EncColMultBolt extends BaseRichBolt resultsMap.clear(); // Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed. - outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0))); + outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(-1L, BigInteger.ZERO)); outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1)); numFlushSignals = 0; } @@ -103,13 +101,13 @@ public class EncColMultBolt extends BaseRichBolt // Data tuple received. Do column multiplication. long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD); - colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD); + BigInteger colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD); logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex); if (resultsMap.containsKey(colIndex)) { - colMult = colVal1.multiply(resultsMap.get(colIndex)); + BigInteger colMult = colVal1.multiply(resultsMap.get(colIndex)); resultsMap.put(colIndex, colMult.mod(nSquared)); } else http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java index 324fbf1..8a5b854 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java @@ -19,8 +19,17 @@ package org.apache.pirk.responder.wideskies.storm; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; + import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -29,15 +38,8 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; +import scala.Tuple2; /** * Bolt class to perform the encrypted row calculation @@ -69,8 +71,8 @@ public class EncRowCalcBolt extends BaseRichBolt private Random rand; // These are the main data structures used here. - private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>(); - private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>(); + private Map<Integer,Integer> hitsByRow = new HashMap<>(); + private Map<Integer,Integer> colIndexByRow = new HashMap<>(); private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>(); private List<BigInteger> dataArray = new ArrayList<>(); @@ -164,8 +166,8 @@ public class EncRowCalcBolt extends BaseRichBolt * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the * colIndexByRow and hitsByRow appropriately. * - * @param tuple - * @return + * @param tuple {@code Tuple} + * @return {@code List<Tuple2>} */ private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java index 68b02f3..17cbee3 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java @@ -19,6 +19,18 @@ package org.apache.pirk.responder.wideskies.storm; +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,17 +45,6 @@ import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.math.BigInteger; -import java.net.URI; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - /** * Bolt to compute and output the final Response object for a query * <p> @@ -65,11 +66,9 @@ public class OutputBolt extends BaseRichBolt private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class); private OutputCollector outputCollector; - private QueryInfo queryInfo; private Response response; private String outputFile; private boolean hdfs; - private String hdfsUri; private Integer flushCounter = 0; private List<Tuple> tuplesToAck = new ArrayList<>(); private Integer totalFlushSigs; @@ -81,10 +80,7 @@ public class OutputBolt extends BaseRichBolt public static CountDownLatch latch = new CountDownLatch(4); // This is the main object here. It holds column Id -> product - private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>(); - - private BigInteger colVal; - private BigInteger colMult; + private Map<Long,BigInteger> resultsMap = new HashMap<>(); private BigInteger nSquared; @@ -100,7 +96,7 @@ public class OutputBolt extends BaseRichBolt if (hdfs) { - hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY); + String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY); try { FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration()); @@ -116,7 +112,7 @@ public class OutputBolt extends BaseRichBolt localStore = new LocalFileSystemStore(); } nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY)); - queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); + QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); response = new Response(queryInfo); logger.info("Intitialized OutputBolt."); @@ -126,7 +122,7 @@ public class OutputBolt extends BaseRichBolt public void execute(Tuple tuple) { long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD); - colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD); + BigInteger colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD); // colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values. // Could have created a new stream for such signals, but that seemed like overkill. @@ -137,12 +133,12 @@ public class OutputBolt extends BaseRichBolt logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs); // Wait till all EncColMultBolts have been flushed - if (flushCounter == totalFlushSigs) + if (Objects.equals(flushCounter, totalFlushSigs)) { logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size()); try { - String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString(); + String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())); for (long cv : resultsMap.keySet()) { response.addElement((int) cv, resultsMap.get(cv)); @@ -182,7 +178,7 @@ public class OutputBolt extends BaseRichBolt // in which case a small number of multiplications still need to be done per column. if (resultsMap.containsKey(colIndex)) { - colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared); + BigInteger colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared); resultsMap.put(colIndex, colMult); } else http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java index 9d24620..bfa916f 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java @@ -20,15 +20,11 @@ package org.apache.pirk.responder.wideskies.storm; import java.math.BigInteger; -import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; - -import org.apache.pirk.schema.data.DataSchema; -import org.apache.pirk.schema.data.DataSchemaRegistry; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.storm.task.TopologyContext; @@ -54,8 +50,6 @@ public class PartitionDataBolt extends BaseBasicBolt private static final long serialVersionUID = 1L; - private QueryInfo queryInfo; - private String queryType; private QuerySchema qSchema = null; private boolean embedSelector; @@ -63,13 +57,12 @@ public class PartitionDataBolt extends BaseBasicBolt private boolean splitPartitions; private JSONObject json; - private List<BigInteger> partitions; @Override public void prepare(Map map, TopologyContext context) { - queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); - queryType = queryInfo.getQueryType(); + QueryInfo queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); + String queryType = queryInfo.getQueryType(); embedSelector = queryInfo.getEmbedSelector(); logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS)); StormUtils.initializeSchemas(map, "partition"); @@ -102,7 +95,7 @@ public class PartitionDataBolt extends BaseBasicBolt try { - partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector); + List<BigInteger> partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector); logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size()); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java index 76bb80c..50d00c8 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java @@ -18,28 +18,25 @@ */ package org.apache.pirk.responder.wideskies.storm; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.utils.KeyedHash; - import org.apache.storm.Config; import org.apache.storm.kafka.StringScheme; import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; - import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** * Scheme used by spout to retrieve and hash selector from JSON data on Kafka. */ @@ -51,7 +48,6 @@ public class PirkHashScheme extends StringScheme implements Scheme private QueryInfo queryInfo; transient private JSONParser parser; - transient private JSONObject json; private boolean initialized = false; private QuerySchema qSchema; private Config conf; @@ -81,8 +77,9 @@ public class PirkHashScheme extends StringScheme implements Scheme initialized = true; } - String str = super.deserializeString(bytes); + String str = deserializeString(bytes); + JSONObject json; try { json = (JSONObject) parser.parse(str); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java index 6540ecc..e0f83d3 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java @@ -147,11 +147,11 @@ public class PirkTopology conf.setDebug(false); // conf.setNumEventLoggers(2); - conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024"))); - conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024"))); - conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32"))); - conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750"))); - conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128"))); + conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.receiveBufferSize", 1024)); + conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.executor.sendBufferSize", 1024)); + conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, SystemConfiguration.getIntProperty("storm.transferBufferSize", 32)); + conf.put(Config.WORKER_HEAP_MEMORY_MB, SystemConfiguration.getIntProperty("storm.worker.heapMemory", 750)); + conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128"))); // Pirk parameters to send to bolts conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java index 7f1e59d..6ef6bc9 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java @@ -21,49 +21,49 @@ package org.apache.pirk.responder.wideskies.storm; public class StormConstants { // Topology Components - public static final String SPOUT_ID = "kafkaspout"; - public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt"; - public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt"; - public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt"; - public static final String OUTPUTBOLT_ID = "outputbolt"; + static final String SPOUT_ID = "kafkaspout"; + static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt"; + static final String ENCROWCALCBOLT_ID = "encrowcalcbolt"; + static final String ENCCOLMULTBOLT_ID = "enccolmultbolt"; + static final String OUTPUTBOLT_ID = "outputbolt"; // Extra Streams - public static final String DEFAULT = "default"; - public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id"; - public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush"; - public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end"; + static final String DEFAULT = "default"; + static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id"; + static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush"; + static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end"; // Tuple Fields // From HashBolt (and variants) - public static final String HASH_FIELD = "hash"; - public static final String PARTIONED_DATA_FIELD = "parData"; - public static final String JSON_DATA_FIELD = "data"; + static final String HASH_FIELD = "hash"; + static final String PARTIONED_DATA_FIELD = "parData"; + static final String JSON_DATA_FIELD = "data"; // From EncRowCalcBolt - public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc"; - public static final String ENCRYPTED_VALUE_FIELD = "encRowValue"; + static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc"; + static final String ENCRYPTED_VALUE_FIELD = "encRowValue"; // From EncColMultBolt - public static final String COLUMN_INDEX_ECM_FIELD = "colIndex"; - public static final String COLUMN_PRODUCT_FIELD = "colProduct"; + static final String COLUMN_INDEX_ECM_FIELD = "colIndex"; + static final String COLUMN_PRODUCT_FIELD = "colProduct"; // Configuration Keys public static final String USE_HDFS = "useHdfs"; - public static final String HDFS_URI_KEY = "hdfsUri"; - public static final String QUERY_FILE_KEY = "queryFile"; + static final String HDFS_URI_KEY = "hdfsUri"; + static final String QUERY_FILE_KEY = "queryFile"; public static final String QUERY_INFO_KEY = "queryInfo"; - public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas"; - public static final String QSCHEMA_KEY = "qSchema"; - public static final String DSCHEMA_KEY = "dschema"; + static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas"; + static final String QSCHEMA_KEY = "qSchema"; + static final String DSCHEMA_KEY = "dschema"; public static final String OUTPUT_FILE_KEY = "output"; - public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector"; - public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector"; - public static final String SALT_COLUMNS_KEY = "saltColumns"; - public static final String ROW_DIVISIONS_KEY = "rowDivisions"; - public static final String SPLIT_PARTITIONS_KEY = "splitPartitions"; + static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector"; + static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector"; + static final String SALT_COLUMNS_KEY = "saltColumns"; + static final String ROW_DIVISIONS_KEY = "rowDivisions"; + static final String SPLIT_PARTITIONS_KEY = "splitPartitions"; public static final String N_SQUARED_KEY = "nSquared"; - public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar"; - public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar"; + static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar"; + static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar"; - public static final String SALT = "salt"; - public static final String FLUSH = "flush"; + static final String SALT = "salt"; + static final String FLUSH = "flush"; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java index 7fbca66..bbffaba 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java @@ -18,6 +18,9 @@ */ package org.apache.pirk.responder.wideskies.storm; +import java.net.URI; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.query.wideskies.Query; @@ -30,9 +33,6 @@ import org.apache.storm.Constants; import org.apache.storm.tuple.Tuple; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.util.Map; - /** * Utils class for the Storm implementation of Wideskies */ @@ -43,14 +43,14 @@ public class StormUtils /** * Method to read in serialized Query object from the given queryFile * - * @param useHdfs - * @param hdfsUri - * @param queryFile - * @return + * @param useHdfs - true or false + * @param hdfsUri - HDFS path + * @param queryFile - + * @return {@link Query} */ public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile) { - Query query = null; + Query query; try { @@ -77,7 +77,7 @@ public class StormUtils * Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas * * @param map - * @return + * @return {@link Query} */ public static Query prepareQuery(Map map) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index c651eaa..4bac8b7 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -31,9 +31,7 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.utils.PIRException; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/main/java/org/apache/pirk/test/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/TestUtils.java b/src/main/java/org/apache/pirk/test/utils/TestUtils.java index be01fb4..05c9f28 100644 --- a/src/main/java/org/apache/pirk/test/utils/TestUtils.java +++ b/src/main/java/org/apache/pirk/test/utils/TestUtils.java @@ -21,7 +21,6 @@ package org.apache.pirk.test.utils; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; @@ -269,10 +268,9 @@ public class TestUtils /** * Converts the result file into an ArrayList of QueryResponseJSON objects * - * @throws IOException - * @throws FileNotFoundException + * @throws IOException - {@link IOException} */ - public static List<QueryResponseJSON> readResultsFile(File file) throws FileNotFoundException, IOException + public static List<QueryResponseJSON> readResultsFile(File file) throws IOException { List<QueryResponseJSON> results = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(file))) @@ -290,7 +288,7 @@ public class TestUtils /** * Write the ArrayList<String to a tmp file in the local filesystem with the given fileName - * + * @throws IOException - {@link IOException} */ public static String writeToTmpFile(List<String> list, String fileName, String suffix) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/5cc5e7c2/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java index fc4e703..425bb0b 100644 --- a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java +++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java @@ -18,6 +18,15 @@ */ package org.apache.pirk.storm; +import java.io.File; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + import kafka.admin.AdminUtils; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; @@ -25,8 +34,8 @@ import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; - import org.I0Itec.zkclient.ZkConnection; + import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -38,7 +47,10 @@ import org.apache.pirk.querier.wideskies.QuerierConst; import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; import org.apache.pirk.query.wideskies.QueryInfo; -import org.apache.pirk.responder.wideskies.storm.*; +import org.apache.pirk.responder.wideskies.storm.OutputBolt; +import org.apache.pirk.responder.wideskies.storm.PirkHashScheme; +import org.apache.pirk.responder.wideskies.storm.PirkTopology; +import org.apache.pirk.responder.wideskies.storm.StormConstants; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.query.filter.StopListFilter; import org.apache.pirk.schema.response.QueryResponseJSON; @@ -61,18 +73,12 @@ import org.apache.storm.testing.TestJob; import org.json.simple.JSONObject; import org.junit.AfterClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.slf4j.LoggerFactory; -import java.io.File; -import java.math.BigInteger; -import java.util.List; -import java.util.Properties; -import java.util.HashMap; -import java.util.Arrays; -import java.util.ArrayList; - @Category(IntegrationTest.class) public class KafkaStormIntegrationTest { @@ -87,6 +93,9 @@ public class KafkaStormIntegrationTest private static final String topic = "pirk_test_topic"; private static final String kafkaTmpDir = "/tmp/kafka"; + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private static File fileQuery; private static File fileQuerier; @@ -122,7 +131,7 @@ public class KafkaStormIntegrationTest performEncryption(); SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath()); - KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); + KafkaProducer<String,String> producer = new KafkaProducer<>(createKafkaProducerConfig()); loadTestData(producer); logger.info("Test (splitPartitions,saltColumns) = (true,true)"); @@ -260,17 +269,13 @@ public class KafkaStormIntegrationTest zookeeperLocalCluster.stop(); FileUtils.deleteDirectory(new File(kafkaTmpDir)); - - fileQuery.delete(); - fileQuerier.delete(); - } - private HashMap<String,Object> createKafkaProducerConfig() + private Map<String,Object> createKafkaProducerConfig() { String kafkaHostName = "localhost"; - Integer kafkaPorts = 11111; - HashMap<String,Object> config = new HashMap<String,Object>(); + int kafkaPorts = 11111; + Map<String,Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts); config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -278,18 +283,17 @@ public class KafkaStormIntegrationTest return config; } - private void loadTestData(KafkaProducer producer) + private void loadTestData(KafkaProducer<String,String> producer) { for (JSONObject dataRecord : Inputs.createJSONDataElements()) { logger.info("Sending record to Kafka " + dataRecord.toString()); - producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString())); + producer.send(new ProducerRecord<>(topic, dataRecord.toString())); } } private void performEncryption() throws Exception { - // ArrayList<String> selectors = BaseTests.selectorsDomain; List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); String queryType = Inputs.DNS_HOSTNAME_QUERY; @@ -307,8 +311,8 @@ public class KafkaStormIntegrationTest logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); // Write out files. - fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt"); - fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt"); + fileQuerier = folder.newFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG + ".txt"); + fileQuery = folder.newFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG + ".txt"); localStore.store(fileQuerier.getAbsolutePath(), querier); localStore.store(fileQuery, querier.getQuery());
