http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java index 4fee85a..f5d24d7 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -21,6 +21,8 @@ package org.apache.pirk.responder.wideskies; import java.util.Arrays; import java.util.List; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.cli.Option; import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.schema.data.DataSchemaLoader; @@ -76,10 +78,45 @@ public class ResponderProps public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches"; public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown"; - static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT, - OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, + // Storm parameters + // hdfs + static final String HDFSURI = "hdfs.uri"; + static final String USEHDFS = "hdfs.use"; + // kafka + static final String KAFKATOPIC = "kafka.topic"; + static final String KAFKACLIENTID = "kafka.clientId"; + static final String KAFKAZK = "kafka.zk"; + static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart"; + // pirk topo + static final String STORMTOPONAME = "storm.topoName"; + static final String STORMWORKERS = "storm.workers"; + static final String STORMNUMACKERS = "storm.numAckers"; + static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize"; + static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize"; + static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize"; + static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending"; + static final String STORMHEAPMEMORY = "storm.worker.heapMemory"; + static final String STORMCHILDOPTS = "storm.worker.childOpts"; + static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory"; + static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem"; + static final String STORMSPOUTPAR = "storm.spout.parallelism"; + static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism"; + static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism"; + static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism"; + static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple"; + static final String STORMSPLITPARTITIONS = "storm.splitPartitions"; + static final String STORMSALTCOLUMNS = "storm.saltColumns"; + static final String STORMNUMROWDIVS = "storm.rowDivs"; + + static final String[] STORMPROPS = new String[]{HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS, + STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP, + STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, STORMSPLITPARTITIONS, + STORMSALTCOLUMNS, STORMNUMROWDIVS}; + + static final List<String> PROPSLIST = Arrays.asList((String[]) ArrayUtils.addAll(new String[]{PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE, + BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN, - COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY); + COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS)); /** * Validates the responder properties @@ -98,7 +135,7 @@ public class ResponderProps } String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); - if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone")) + if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm") && !platform.equals("standalone")) { logger.info("Unsupported platform: " + platform); valid = false;
http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java index 0745bea..0050e29 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java @@ -44,7 +44,6 @@ import com.google.common.cache.LoadingCache; /** * Class to compute the encrypted row elements for a query from extracted data partitions - * */ public class ComputeEncryptedRow { @@ -98,7 +97,6 @@ public class ComputeEncryptedRow * Optionally uses a static LRU cache for the modular exponentiation * <p> * Emits {@code Tuple2<<colNum, colVal>>} - * */ public static List<Tuple2<Long,BigInteger>> computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException @@ -112,7 +110,7 @@ public class ComputeEncryptedRow int elementCounter = 0; for (BytesArrayWritable dataPartitions : dataPartitionsIter) { - logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter); + logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter); if (limitHitsPerSelector) { @@ -121,7 +119,7 @@ public class ComputeEncryptedRow break; } } - logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter); + logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter); // Update the associated column values for (int i = 0; i < dataPartitions.size(); ++i) @@ -142,8 +140,8 @@ public class ComputeEncryptedRow { e.printStackTrace(); } - logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = " - + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2)); + logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", + rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2)); returnPairs.add(new Tuple2<>(colCounter, exp)); @@ -162,7 +160,6 @@ public class ComputeEncryptedRow * Optionally uses a static LRU cache for the modular exponentiation * <p> * Emits {@code Tuple2<<colNum, colVal>>} - * */ public static List<Tuple2<Long,BigInteger>> computeEncRowBI(Iterable<List<BigInteger>> dataPartitionsIter, Query query, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException @@ -178,7 +175,7 @@ public class ComputeEncryptedRow { // long startTime = System.currentTimeMillis(); - logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter); + logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter); if (limitHitsPerSelector) { @@ -188,8 +185,7 @@ public class ComputeEncryptedRow break; } } - logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter); - + logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter); // Update the associated column values for (int i = 0; i < dataPartitions.size(); ++i) { @@ -209,8 +205,9 @@ public class ComputeEncryptedRow { e.printStackTrace(); } - logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = " - + exp + " i = " + i); + + logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {}", + rowIndex, colCounter, part.toString(), part.toString(2), exp, i); returnPairs.add(new Tuple2<>(colCounter, exp)); @@ -234,7 +231,6 @@ public class ComputeEncryptedRow * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. * <p> * Emits {@code Tuple2<<colNum, colVal>>} - * */ public static List<Tuple2<Long,BigInteger>> computeEncRowCacheInput(Iterable<List<BigInteger>> dataPartitionsIter, HashMap<Integer,BigInteger> cache, int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector) throws IOException @@ -245,7 +241,7 @@ public class ComputeEncryptedRow int elementCounter = 0; for (List<BigInteger> dataPartitions : dataPartitionsIter) { - logger.debug("elementCounter = " + elementCounter); + logger.debug("elementCounter = {}", elementCounter); if (limitHitsPerSelector) { @@ -254,7 +250,7 @@ public class ComputeEncryptedRow break; } } - logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter); + logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter); // Update the associated column values for (int i = 0; i < dataPartitions.size(); ++i) @@ -262,7 +258,7 @@ public class ComputeEncryptedRow BigInteger part = dataPartitions.get(i); BigInteger exp = cache.get(part.intValue()); - logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " exp = " + exp + " i = " + i); + logger.debug("rowIndex = {} colCounter = {} part = {} exp = {} i = {}", rowIndex, colCounter, part.toString(), exp, i); returnPairs.add(new Tuple2<>(colCounter, exp)); @@ -283,7 +279,6 @@ public class ComputeEncryptedRow * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values * <p> * Emits {@code Tuple2<<colNum, colVal>>} - * */ public static List<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable dataPartitions, Query query, int rowIndex, int colIndex) throws IOException { @@ -295,7 +290,7 @@ public class ComputeEncryptedRow // Initialize the column counter long colCounter = colIndex; - logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter); + logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter); // Update the associated column values for (int i = 0; i < dataPartitions.size(); ++i) @@ -311,8 +306,8 @@ public class ComputeEncryptedRow e.printStackTrace(); } - logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = " - + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2)); + logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", + rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2)); returnPairs.add(new Tuple2<>(colCounter, exp)); @@ -321,4 +316,81 @@ public class ComputeEncryptedRow return returnPairs; } + + /** + * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<<BigInteger>> + * <p> + * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values. + * <p> + * Uses a static LRU cache for the modular exponentiation + * <p> + * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values + * <p> + * Emits {@code Tuple2<<colNum, colVal>>} + */ + public static List<Tuple2<Long,BigInteger>> computeEncRow(List<BigInteger> dataPartitions, Query query, int rowIndex, int colIndex) + throws IOException + { + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + + // Pull the corresponding encrypted row query + BigInteger rowQuery = query.getQueryElement(rowIndex); + + // Initialize the column counter + long colCounter = colIndex; + + logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions, rowIndex, colCounter); + + // Update the associated column values + for (int i = 0; i < dataPartitions.size(); ++i) + { + BigInteger part = dataPartitions.get(i); + + BigInteger exp = null; + try + { + exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared())); + } catch (ExecutionException e) + { + e.printStackTrace(); + break; + } + + logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}", + rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2)); + + returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp)); + + ++colCounter; + } + + return returnPairs; + } + + public static List<Tuple2<Long,BigInteger>> computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws IOException + { + List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>(); + + // Pull the corresponding encrypted row query + BigInteger rowQuery = query.getQueryElement(rowIndex); + + // Initialize the column counter + long colCounter = colIndex; + + // Update the associated column values + BigInteger exp = null; + try + { + exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared())); + } catch (ExecutionException e) + { + e.printStackTrace(); + } + + returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp)); + + ++colCounter; + + return returnPairs; + } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java index 61169f2..e605d94 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java @@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; /** - * Given a MapWritable dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement, perform a keyed hash - * of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>} + * Given a MapWritable or JSON formatted dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement, + * perform a keyed hash of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>} */ public class HashSelectorAndPartitionData { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index f34acf8..6014435 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -90,8 +90,9 @@ public class ComputeResponse private BroadcastVars bVars = null; private QueryInfo queryInfo = null; - Query query = null; - QuerySchema qSchema = null; + + private Query query = null; + private QuerySchema qSchema = null; private int numDataPartitions = 0; private int numColMultPartitions = 0; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java new file mode 100644 index 0000000..08c9917 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; + +/** + * Bolt class to perform encrypted column multiplication + * <p> + * Takes {@code <columnIndex, columnValue>} tuples as input and aggregates (multiplies) the columnValues for a given columnIndex as they are received. + * <p> + * EncRowCalcBolts send flush signals to the EncColMultBolts indicating that they have finished sending all tuples for a session. Whenever a flush signal is + * received from a EncRowCalcBolt, the num of received flush signals is tallied until each EncRowCalcBolt has emitted a flush signal. + * <p> + * Once a flush signal has been received from each EncRowCalcBolt, all {@code <columnIndex, aggregate colVal product>} tuples are sent to the OutputBolt and a session_end + * signal is sent back to each EncRowMultBolt. + * <p> + * The EncRowMultBolts buffer their output from the time that they send a flush signal to the EncColMultBolts until the time that they receive a session_end + * signal from all of the EncColMultBolts. + * + */ +public class EncColMultBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncColMultBolt.class); + + private OutputCollector outputCollector; + + private BigInteger nSquared; + private long numFlushSignals; + private Long totalFlushSignals; + + // This is the main object here. It holds column Id -> aggregated product + private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>(); + private BigInteger colVal1; + private BigInteger colMult; + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) + { + outputCollector = collector; + String nSquare = (String) map.get(StormConstants.N_SQUARED_KEY); + nSquared = new BigInteger(nSquare); + totalFlushSignals = (Long) map.get(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY); + + logger.info("Initialized EncColMultBolt. "); + } + + @Override + public void execute(Tuple tuple) + { + if (tuple.getSourceStreamId().equals(StormConstants.ENCROWCALCBOLT_FLUSH_SIG)) + { + numFlushSignals += 1; + logger.debug("Received {} flush signals out of {}", numFlushSignals, totalFlushSignals); + + // Need to receive notice from all EncRowCalcBolts in order to flush. + if (numFlushSignals == totalFlushSignals) + { + logger.debug("Received signal to flush in EncColMultBolt. Outputting {} results.", resultsMap.keySet().size()); + for (Long key : resultsMap.keySet()) + // key = column Id, value = aggregated product + outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(key, resultsMap.get(key))); + resultsMap.clear(); + + // Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed. + outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0))); + outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1)); + numFlushSignals = 0; + } + } + else + { + // Data tuple received. Do column multiplication. + + long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD); + colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD); + + logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex); + + if (resultsMap.containsKey(colIndex)) + { + colMult = colVal1.multiply(resultsMap.get(colIndex)); + resultsMap.put(colIndex, colMult.mod(nSquared)); + } + else + { + resultsMap.put(colIndex, colVal1); + } + } + outputCollector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID, + new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD)); + outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Fields("finished")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java new file mode 100644 index 0000000..639a52b --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Bolt class to perform the encrypted row calculation + * <p> + * Receives a {@code <hash(selector), dataPartitions>} tuple as input. + * <p> + * Encrypts the row data and emits a (column index, encrypted row-value) tuple for each encrypted block. + * <p> + * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush its output and resets all counters. At that point, all outgoing (column index, + * encrypted row-value) tuples are buffered until a SESSION_END signal is received back from each EncColMultBolt. + */ +public class EncRowCalcBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncRowCalcBolt.class); + + private OutputCollector outputCollector; + private static Query query; + private static boolean querySet = false; + + private Boolean limitHitsPerSelector; + private Long maxHitsPerSelector; + private Long totalEndSigs; + private int rowDivisions; + private Boolean saltColumns; + private Boolean splitPartitions; + + private Random rand; + + // These are the main data structures used here. + private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>(); + private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>(); + private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>(); + private List<BigInteger> dataArray = new ArrayList<>(); + + private int numEndSigs = 0; + + // These buffered values are used in the case when a session has been ejected, but the SESSION_END signal has not been received + // yet from the next bolt. + private boolean buffering = false; + private List<Tuple2<Long,BigInteger>> bufferedValues = new ArrayList<>(); + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector coll) + { + outputCollector = coll; + setQuery(map); + logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS)); + + maxHitsPerSelector = (Long) map.get(StormConstants.MAX_HITS_PER_SEL_KEY); + limitHitsPerSelector = (Boolean) map.get(StormConstants.LIMIT_HITS_PER_SEL_KEY); + totalEndSigs = (Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY); + splitPartitions = (Boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY); + saltColumns = (Boolean) map.get(StormConstants.SALT_COLUMNS_KEY); + rowDivisions = ((Long) map.get(StormConstants.ROW_DIVISIONS_KEY)).intValue(); + + // If splitPartitions==true, the data is incoming partition by partition, rather than record by record. + // The numRecords below will increment every partition elt exceed the maxHitsPerSelector param far too + // soon unless the latter is modified. + if (splitPartitions) + maxHitsPerSelector *= query.getQueryInfo().getNumPartitionsPerDataElement(); + + rand = new Random(); + + logger.info("Initialized EncRowCalcBolt."); + } + + @Override + public void execute(Tuple tuple) + { + if (tuple.getSourceStreamId().equals(StormConstants.DEFAULT)) + { + matrixElements = processTupleFromPartitionDataBolt(tuple); // tuple: <hash,partitions> + + if (buffering) + { + logger.debug("Buffering tuple."); + bufferedValues.addAll(matrixElements); + } + else + { + emitTuples(matrixElements); + } + } + else if (StormUtils.isTickTuple(tuple) && !buffering) + { + logger.debug("Sending flush signal to EncColMultBolt."); + outputCollector.emit(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Values(1)); + + colIndexByRow.clear(); + hitsByRow.clear(); + + buffering = true; + } + else if (tuple.getSourceStreamId().equals(StormConstants.ENCCOLMULTBOLT_SESSION_END)) + { + numEndSigs += 1; + logger.debug("SessionEnd signal {} of {} received", numEndSigs, totalEndSigs); + + // Need to receive signal from all EncColMultBolt instances before stopping buffering. + if (numEndSigs == totalEndSigs) + { + logger.debug("Buffering completed, emitting {} tuples.", bufferedValues.size()); + emitTuples(bufferedValues); + bufferedValues.clear(); + buffering = false; + + numEndSigs = 0; + } + } + outputCollector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, + new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT)); + outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH)); + } + + /*** + * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the + * colIndexByRow and hitsByRow appropriately. + * + * @param tuple + * @return + */ + private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple) + { + matrixElements.clear(); + int rowIndex = tuple.getIntegerByField(StormConstants.HASH_FIELD); + + if (!colIndexByRow.containsKey(rowIndex)) + { + colIndexByRow.put(rowIndex, 0); + hitsByRow.put(rowIndex, 0); + } + + if (splitPartitions) + { + dataArray.add((BigInteger) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD)); + } + else + { + dataArray = (ArrayList<BigInteger>) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD); + } + logger.debug("Retrieving {} elements in EncRowCalcBolt.", dataArray.size()); + + try + { + int colIndex = colIndexByRow.get(rowIndex); + int numRecords = hitsByRow.get(rowIndex); + + if (limitHitsPerSelector && numRecords < maxHitsPerSelector) + { + logger.debug("computing matrix elements."); + matrixElements = ComputeEncryptedRow.computeEncRow(dataArray, query, rowIndex, colIndex); + colIndexByRow.put(rowIndex, colIndex + matrixElements.size()); + hitsByRow.put(rowIndex, numRecords + 1); + } + else if (limitHitsPerSelector) + { + logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + numRecords); + } + } catch (IOException e) + { + logger.warn("Caught IOException while encrypting row. ", e); + } + + dataArray.clear(); + return matrixElements; + } + + private void emitTuples(List<Tuple2<Long,BigInteger>> matrixElements) + { + // saltColumns distributes the column multiplication done in the next bolt EncColMultBolt to avoid hotspotting. + if (saltColumns) + { + for (Tuple2<Long,BigInteger> sTuple : matrixElements) + { + outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), rand.nextInt(rowDivisions))); + } + } + else + { + for (Tuple2<Long,BigInteger> sTuple : matrixElements) + { + outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), 0)); + } + } + } + + private synchronized static void setQuery(Map map) + { + if (!querySet) + { + query = StormUtils.prepareQuery(map); + querySet = true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java new file mode 100644 index 0000000..68b02f3 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Bolt to compute and output the final Response object for a query + * <p> + * Receives {@code <colIndex, colProduct>} tuples, computes the final column product for each colIndex, records the results in the final Response object, and + * outputs the final Response object for the query. + * <p> + * Flush signals are sent to the OuputBolt from the EncColMultBolts via a tuple of the form {@code <-1, 0>}. Once a flush signal has been received from each + * EncColMultBolt (or a timeout is reached), the final column product is computed and the final Response is formed and emitted. + * <p> + * Currently, the Responses are written to HDFS to location specified by the outputFile with the timestamp appended. + * <p> + * TODO: -- Enable other Response output locations + * + */ +public class OutputBolt extends BaseRichBolt +{ + private static final long serialVersionUID = 1L; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class); + + private OutputCollector outputCollector; + private QueryInfo queryInfo; + private Response response; + private String outputFile; + private boolean hdfs; + private String hdfsUri; + private Integer flushCounter = 0; + private List<Tuple> tuplesToAck = new ArrayList<>(); + private Integer totalFlushSigs; + + private LocalFileSystemStore localStore; + private HadoopFileSystemStore hadoopStore; + + // This latch just serves as a hook for testing. + public static CountDownLatch latch = new CountDownLatch(4); + + // This is the main object here. It holds column Id -> product + private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>(); + + private BigInteger colVal; + private BigInteger colMult; + + private BigInteger nSquared; + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) + { + outputCollector = collector; + + totalFlushSigs = ((Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY)).intValue(); + outputFile = (String) map.get(StormConstants.OUTPUT_FILE_KEY); + + hdfs = (boolean) map.get(StormConstants.USE_HDFS); + + if (hdfs) + { + hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY); + try + { + FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration()); + hadoopStore = new HadoopFileSystemStore(fs); + } catch (IOException e) + { + logger.error("Failed to initialize Hadoop file system for output."); + throw new RuntimeException(e); + } + } + else + { + localStore = new LocalFileSystemStore(); + } + nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY)); + queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); + response = new Response(queryInfo); + + logger.info("Intitialized OutputBolt."); + } + + @Override + public void execute(Tuple tuple) + { + long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD); + colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD); + + // colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values. + // Could have created a new stream for such signals, but that seemed like overkill. + if (colIndex == -1) + { + flushCounter++; + + logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs); + + // Wait till all EncColMultBolts have been flushed + if (flushCounter == totalFlushSigs) + { + logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size()); + try + { + String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString(); + for (long cv : resultsMap.keySet()) + { + response.addElement((int) cv, resultsMap.get(cv)); + } + + if (hdfs) + { + hadoopStore.store(new Path(outputFile + "_" + timestamp), response); + } + else + { // In order to accommodate testing, this does not currently include timestamp. + // Should probably be fixed, but this will not likely be used outside of testing. + localStore.store(new File(outputFile), response); + for (long cv : resultsMap.keySet()) + { + response.addElement((int) cv, resultsMap.get(cv)); + logger.debug("column = " + cv + ", value = " + resultsMap.get(cv).toString()); + } + } + } catch (IOException e) + { + logger.warn("Unable to write output file."); + } + + // Reset + resultsMap.clear(); + flushCounter = 0; + for (Tuple t : tuplesToAck) + outputCollector.ack(t); + // Used for integration test + latch.countDown(); + } + } + else + { + // Process data values: add them to map. The column multiplication is only done in the case where saltColumns==true, + // in which case a small number of multiplications still need to be done per column. + if (resultsMap.containsKey(colIndex)) + { + colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared); + resultsMap.put(colIndex, colMult); + } + else + { + resultsMap.put(colIndex, colVal); + } + logger.debug("column = " + colIndex + ", value = " + resultsMap.get(colIndex).toString()); + } + outputCollector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + {} +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java new file mode 100644 index 0000000..9d24620 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.query.wideskies.QueryUtils; + +import org.apache.pirk.schema.data.DataSchema; +import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.slf4j.LoggerFactory; + +/** + * Bolt to extract the partitions of the data record and output {@code <hash(selector), dataPartitions>} + * <p> + * Currently receives a {@code <hash(selector), JSON data record>} as input. + * <p> + * + */ +public class PartitionDataBolt extends BaseBasicBolt +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PartitionDataBolt.class); + + private static final long serialVersionUID = 1L; + + private QueryInfo queryInfo; + private String queryType; + private QuerySchema qSchema = null; + + private boolean embedSelector; + + private boolean splitPartitions; + + private JSONObject json; + private List<BigInteger> partitions; + + @Override + public void prepare(Map map, TopologyContext context) + { + queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY)); + queryType = queryInfo.getQueryType(); + embedSelector = queryInfo.getEmbedSelector(); + logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS)); + StormUtils.initializeSchemas(map, "partition"); + try + { + if ((boolean) map.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY)) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = QuerySchemaRegistry.get(queryType); + } + } catch (Exception e) + { + logger.error("Unable to initialize schemas in PartitionDataBolt. ", e); + } + + json = new JSONObject(); + splitPartitions = (boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY); + + logger.info("Initialized ExtractAndPartitionDataBolt."); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector outputCollector) + { + int hash = tuple.getIntegerByField(StormConstants.HASH_FIELD); + json = (JSONObject) tuple.getValueByField(StormConstants.JSON_DATA_FIELD); + + try + { + partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector); + + logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size()); + + // splitPartitions determines whether each partition piece is sent individually or the full Array is sent together. + // Since processing in the follow-on bolt (EncRowCalcBolt) is computationally expensive, current working theory is + // that splitting them up allows for better throughput. Though maybe with better knowledge/tuning of Storm internals + // and paramters (e.g. certain buffer sizes), it may make no difference. + if (splitPartitions) + { + for (BigInteger partition : partitions) + { + outputCollector.emit(new Values(hash, partition)); + } + } + else + { + outputCollector.emit(new Values(hash, partitions)); + } + + } catch (Exception e) + { + logger.warn("Failed to partition data for record -- " + json + "\n", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) + { + outputFieldsDeclarer.declare(new Fields(StormConstants.HASH_FIELD, StormConstants.PARTIONED_DATA_FIELD)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java new file mode 100644 index 0000000..76bb80c --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.query.wideskies.QueryUtils; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.utils.KeyedHash; + +import org.apache.storm.Config; +import org.apache.storm.kafka.StringScheme; +import org.apache.storm.spout.Scheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Scheme used by spout to retrieve and hash selector from JSON data on Kafka. + */ +public class PirkHashScheme extends StringScheme implements Scheme +{ + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkHashScheme.class); + + private QueryInfo queryInfo; + + transient private JSONParser parser; + transient private JSONObject json; + private boolean initialized = false; + private QuerySchema qSchema; + private Config conf; + + public PirkHashScheme(Config conf) + { + this.conf = conf; + } + + public List<Object> deserialize(ByteBuffer bytes) + { + if (!initialized) + { + parser = new JSONParser(); + queryInfo = new QueryInfo((Map) conf.get(StormConstants.QUERY_INFO_KEY)); + + StormUtils.initializeSchemas(conf, "hashScheme"); + + if ((boolean) conf.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY)) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType()); + } + + initialized = true; + } + String str = super.deserializeString(bytes); + + try + { + json = (JSONObject) parser.parse(str); + } catch (ParseException e) + { + json = null; + logger.warn("ParseException parsing " + str, e); + } + String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json); + int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector); + + return new Values(hash, json); + } + + public Fields getOutputFields() + { + return new Fields(StormConstants.HASH_FIELD, StormConstants.JSON_DATA_FIELD); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java new file mode 100644 index 0000000..ddfca8b --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.*; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.slf4j.LoggerFactory; + +/** + * Storm topology class for wideskies Pirk implementation + * <p> + * + */ +public class PirkTopology +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkTopology.class); + + private static final String kafkaClientId = SystemConfiguration.getProperty("kafka.clientId", "KafkaSpout"); + private static final String brokerZk = SystemConfiguration.getProperty("kafka.zk", "localhost:2181"); + private static final String kafkaTopic = SystemConfiguration.getProperty("kafka.topic", "pirkTopic"); + private static final Boolean forceFromStart = Boolean.parseBoolean(SystemConfiguration.getProperty("kafka.forceFromStart", "false")); + + private static final Boolean useHdfs = Boolean.parseBoolean(SystemConfiguration.getProperty("hdfs.use", "true")); + private static final String hdfsUri = SystemConfiguration.getProperty("hdfs.uri", "localhost"); + + private static final String topologyName = SystemConfiguration.getProperty("storm.topoName", "PirkTopology"); + private static final Integer numWorkers = Integer.parseInt(SystemConfiguration.getProperty("storm.workers", "1")); + + private static final Integer spoutParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.spout.parallelism", "1")); + private static final Integer partitionDataBoltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.partitiondata.parallelism", "1")); + private static final Integer encrowcalcboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.parallelism", "1")); + private static final Integer enccolmultboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.enccolmultbolt.parallelism", "1")); + + private static final Boolean saltColumns = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.saltColumns", "false")); + private static final Boolean splitPartitions = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.splitPartitions", "false")); + + private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput"); + private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile"); + + public static void runPirkTopology() throws Exception + { + // Set up Kafka parameters + logger.info("Configuring Kafka."); + String zkRoot = "/" + kafkaTopic + "_pirk_storm"; + BrokerHosts zkHosts = new ZkHosts(brokerZk); + SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, kafkaTopic, zkRoot, kafkaClientId); + kafkaConfig.ignoreZkOffsets = forceFromStart; + + // Create conf + logger.info("Retrieving Query and generating Storm conf."); + Config conf = createStormConf(); + Query query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile); + conf.put(StormConstants.N_SQUARED_KEY, query.getNSquared().toString()); + conf.put(StormConstants.QUERY_INFO_KEY, query.getQueryInfo().toMap()); + + // Configure this for different types of input data on Kafka. + kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf)); + + // Create topology + StormTopology topology = getPirkTopology(kafkaConfig); + + // Run topology + logger.info("Submitting Pirk topology to Storm..."); + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology); + + } // main + + /*** + * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout. + * + * @param kafkaConfig + * @return + */ + public static StormTopology getPirkTopology(SpoutConfig kafkaConfig) + { + // Create spout and bolts + KafkaSpout spout = new KafkaSpout(kafkaConfig); + PartitionDataBolt partitionDataBolt = new PartitionDataBolt(); + EncRowCalcBolt ercbolt = new EncRowCalcBolt(); + EncColMultBolt ecmbolt = new EncColMultBolt(); + OutputBolt outputBolt = new OutputBolt(); + + // Build Storm topology + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism); + + builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID, + new Fields(StormConstants.HASH_FIELD)); + + // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3). + BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism) + .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD)) + .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END) + .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple"))); + + // b2.setMemoryLoad(5000); + // b2.setCPULoad(150.0); + + BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism) + .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, + new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT)) + .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG); + // b3.setMemoryLoad(5000); + // b3.setCPULoad(500.0); + + builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID); + + return builder.createTopology(); + } + + public static Config createStormConf() + { + + Boolean limitHitsPerSelector = Boolean.parseBoolean(SystemConfiguration.getProperty("pir.limitHitsPerSelector")); + Integer maxHitsPerSelector = Integer.parseInt(SystemConfiguration.getProperty("pir.maxHitsPerSelector")); + Integer rowDivisions = Integer.parseInt(SystemConfiguration.getProperty("storm.rowDivs", "1")); + + Config conf = new Config(); + conf.setNumAckers(Integer.parseInt(SystemConfiguration.getProperty("storm.numAckers", numWorkers.toString()))); + conf.setMaxSpoutPending(Integer.parseInt(SystemConfiguration.getProperty("storm.maxSpoutPending", "300"))); + conf.setNumWorkers(numWorkers); + conf.setDebug(false); + // conf.setNumEventLoggers(2); + + conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024"))); + conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024"))); + conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32"))); + conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750"))); + conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128"))); + + // Pirk parameters to send to bolts + conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true")); + conf.put(StormConstants.QSCHEMA_KEY, SystemConfiguration.getProperty("query.schemas")); + conf.put(StormConstants.DSCHEMA_KEY, SystemConfiguration.getProperty("data.schemas")); + conf.put(StormConstants.HDFS_URI_KEY, hdfsUri); + conf.put(StormConstants.QUERY_FILE_KEY, queryFile); + conf.put(StormConstants.USE_HDFS, useHdfs); + conf.put(StormConstants.OUTPUT_FILE_KEY, outputPath); + conf.put(StormConstants.LIMIT_HITS_PER_SEL_KEY, limitHitsPerSelector); + conf.put(StormConstants.MAX_HITS_PER_SEL_KEY, maxHitsPerSelector); + conf.put(StormConstants.SPLIT_PARTITIONS_KEY, splitPartitions); + conf.put(StormConstants.SALT_COLUMNS_KEY, saltColumns); + conf.put(StormConstants.ROW_DIVISIONS_KEY, rowDivisions); + conf.put(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY, encrowcalcboltParallelism); + conf.put(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY, enccolmultboltParallelism); + + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java new file mode 100644 index 0000000..7f1e59d --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.storm; + +public class StormConstants +{ + // Topology Components + public static final String SPOUT_ID = "kafkaspout"; + public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt"; + public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt"; + public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt"; + public static final String OUTPUTBOLT_ID = "outputbolt"; + + // Extra Streams + public static final String DEFAULT = "default"; + public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id"; + public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush"; + public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end"; + + // Tuple Fields + // From HashBolt (and variants) + public static final String HASH_FIELD = "hash"; + public static final String PARTIONED_DATA_FIELD = "parData"; + public static final String JSON_DATA_FIELD = "data"; + // From EncRowCalcBolt + public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc"; + public static final String ENCRYPTED_VALUE_FIELD = "encRowValue"; + // From EncColMultBolt + public static final String COLUMN_INDEX_ECM_FIELD = "colIndex"; + public static final String COLUMN_PRODUCT_FIELD = "colProduct"; + + // Configuration Keys + public static final String USE_HDFS = "useHdfs"; + public static final String HDFS_URI_KEY = "hdfsUri"; + public static final String QUERY_FILE_KEY = "queryFile"; + public static final String QUERY_INFO_KEY = "queryInfo"; + public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas"; + public static final String QSCHEMA_KEY = "qSchema"; + public static final String DSCHEMA_KEY = "dschema"; + public static final String OUTPUT_FILE_KEY = "output"; + public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector"; + public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector"; + public static final String SALT_COLUMNS_KEY = "saltColumns"; + public static final String ROW_DIVISIONS_KEY = "rowDivisions"; + public static final String SPLIT_PARTITIONS_KEY = "splitPartitions"; + public static final String N_SQUARED_KEY = "nSquared"; + public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar"; + public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar"; + + public static final String SALT = "salt"; + public static final String FLUSH = "flush"; + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java new file mode 100644 index 0000000..7fbca66 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Map; + +/** + * Utils class for the Storm implementation of Wideskies + */ +public class StormUtils +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(StormUtils.class); + + /** + * Method to read in serialized Query object from the given queryFile + * + * @param useHdfs + * @param hdfsUri + * @param queryFile + * @return + */ + public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile) + { + Query query = null; + + try + { + if (useHdfs) + { + FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration()); + logger.info("reading query file from hdfs: " + queryFile); + query = (new HadoopFileSystemStore(fs)).recall(queryFile, Query.class); + } + else + { + logger.info("reading local query file from " + queryFile); + query = (new LocalFileSystemStore()).recall(queryFile, Query.class); + } + } catch (Exception e) + { + logger.error("Unable to initalize query info.", e); + throw new RuntimeException(e); + } + return query; + } + + /** + * Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas + * + * @param map + * @return + */ + public static Query prepareQuery(Map map) + { + Query query = null; + + boolean useHdfs = (boolean) map.get(StormConstants.USE_HDFS); + String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY); + String queryFile = (String) map.get(StormConstants.QUERY_FILE_KEY); + try + { + query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile); + + } catch (Exception e) + { + logger.warn("Unable to initialize query info.", e); + } + + return query; + } + + /*** + * Initialize data and query schema. Conf requires values for USE_HDFS, HDFS_URI_KEY, DSCHEMA_KEY, and QSCHEMA_KEY + * + * @param conf + */ + public static void initializeSchemas(Map conf, String id) + { + SystemConfiguration.setProperty("data.schemas", (String) conf.get(StormConstants.DSCHEMA_KEY)); + SystemConfiguration.setProperty("query.schemas", (String) conf.get(StormConstants.QSCHEMA_KEY)); + + try + { + boolean hdfs = (boolean) conf.get(StormConstants.USE_HDFS); + if (hdfs) + { + String hdfsUri = (String) conf.get(StormConstants.HDFS_URI_KEY); + FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration()); + DataSchemaLoader.initialize(true, fs); + QuerySchemaLoader.initialize(true, fs); + } + else + { + DataSchemaLoader.initialize(); + QuerySchemaLoader.initialize(); + } + } catch (Exception e) + { + logger.error("Failed to initialize schema files.", e); + throw new RuntimeException(e); + } + } + + protected static boolean isTickTuple(Tuple tuple) + { + return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index 28de96e..c651eaa 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -31,7 +31,9 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.utils.PIRException; @@ -131,8 +133,8 @@ public class DataSchemaLoader InputStream is = null; if (hdfs) { - is = fs.open(new Path(schemaFile)); logger.info("hdfs: filePath = " + schemaFile); + is = fs.open(fs.makeQualified(new Path(schemaFile))); } else { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index 962e467..b0c9326 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -77,8 +77,6 @@ public class BaseTests { logger.info("Running testDNSHostnameQuery(): "); - QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY); - int numExpectedResults = 6; List<QueryResponseJSON> results; if (isDistributed) @@ -93,6 +91,14 @@ public class BaseTests numExpectedResults = 7; // all 7 for non distributed case; if testFalsePositive==true, then 6 } } + checkDNSHostnameQueryResults(results, isDistributed, numExpectedResults, testFalsePositive, dataElements); + logger.info("Completed testDNSHostnameQuery(): "); + } + + public static void checkDNSHostnameQueryResults(List<QueryResponseJSON> results, boolean isDistributed, int numExpectedResults, + boolean testFalsePositive, List<JSONObject> dataElements) + { + QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY); logger.info("results:"); printResultList(results); @@ -188,7 +194,6 @@ public class BaseTests } } } - logger.info("Completed testDNSHostnameQuery(): "); } public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/log4j2.xml ---------------------------------------------------------------------- diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 8f82f1c..7501aaa 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -46,4 +46,4 @@ </Root> </Loggers> -</Configuration> \ No newline at end of file +</Configuration> http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/pirk.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties index 963fa34..a88c846 100755 --- a/src/main/resources/pirk.properties +++ b/src/main/resources/pirk.properties @@ -228,9 +228,3 @@ pir.expDir = none #Parallelism for expLookupTable creation in hdfs pir.expCreationSplits = 600 - - - - - - http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/main/resources/responder.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties index 3ae92c7..ac6cb35 100644 --- a/src/main/resources/responder.properties +++ b/src/main/resources/responder.properties @@ -154,4 +154,47 @@ pir.queryInput= #default is false #spark.streaming.stopGracefullyOnShutdown= - \ No newline at end of file + ##Properties for Kafka + #kafka.topic = topicName + #kafka.clientId = pirk_spout + + # Kafka Zookeepers + #kafka.zk = localhost:2181 + # Read from beginning of Kafka topic on startup + #kafka.forceFromStart = false + + + ##Properties for Storm + #storm.topoName = pir + #storm.workers = 1 + #storm.numAckers = 1 + #storm.maxSpoutPending=10 + #storm.worker.heapMemory=6000 + #storm.componentOnheapMem= 600.0 + + # This should be set to the number of Kafka partitions + #storm.spout.parallelism = 1 + + #storm.hashbolt.parallelism = 1 + #storm.encrowcalcbolt.parallelism = 1 + # This bolt is most computationally expensive and should have the highest value + #storm.enccolmultbolt.parallelism = 2 + + # These may be useful for tuning + #storm.executor.receiveBufferSize = 1024 + #storm.executor.sendBufferSize = 1024 + #storm.transferBufferSize = 8 + + # Frequency with which PIR matrix elements are flushed out + #storm.encrowcalcbolt.ticktuple = 60 + + # Design configurations: + # Hashbolt emits individual tuples for each data partition when splitPartitions =true + # emits the batch of data partitions for a record in a single tuple when =false + #storm.splitPartitions = true + # A task running EncColMultBolt will only be responsible for multiplying a subset of the row + # for any individual column when saltColumns = true + # All multiplication for a single column is done on a single EncColMultBolt instance when = false + #storm.saltColumns = true + # Only makes sense to tune if saltColumns=true + #storm.rowDivs = 1 http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/beb69e3f/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java new file mode 100644 index 0000000..906c337 --- /dev/null +++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.storm; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; + +import org.I0Itec.zkclient.ZkConnection; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pirk.encryption.Paillier; +import org.apache.pirk.querier.wideskies.Querier; +import org.apache.pirk.querier.wideskies.QuerierConst; +import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse; +import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.storm.*; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.schema.query.filter.StopListFilter; +import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.test.utils.BaseTests; +import org.apache.pirk.test.utils.Inputs; +import org.apache.pirk.test.utils.TestUtils; +import org.apache.pirk.utils.QueryResultsWriter; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.storm.Config; +import org.apache.storm.ILocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MkClusterParam; +import org.apache.storm.testing.TestJob; +import org.json.simple.JSONObject; + +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.math.BigInteger; +import java.util.List; +import java.util.Properties; +import java.util.HashMap; +import java.util.Arrays; +import java.util.ArrayList; + +@Category(IntegrationTest.class) +public class KafkaStormIntegrationTest +{ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class); + + private static final LocalFileSystemStore localStore = new LocalFileSystemStore(); + + private static TestingServer zookeeperLocalCluster; + private static KafkaServer kafkaLocalBroker; + private static ZkClient zkClient; + + private static final String topic = "pirk_test_topic"; + private static final String kafkaTmpDir = "/tmp/kafka"; + + private static File fileQuery; + private static File fileQuerier; + + private QueryInfo queryInfo; + private BigInteger nSquared; + + private static int testCountDown = 4; + + @Test + public void testKafkaStormIntegration() throws Exception + { + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); + SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10"); + SystemConfiguration.setProperty("storm.spout.parallelism", "1"); + SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1"); + SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2"); + SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2"); + SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "8"); + SystemConfiguration.setProperty("storm.rowDivs", "2"); + SystemConfiguration.setProperty("hdfs.use", "false"); + + startZookeeper(); + startKafka(); + + SystemConfiguration.setProperty("kafka.topic", topic); + SystemConfiguration.setProperty("storm.topoName", "pirTest"); + + // Create encrypted file + SystemConfiguration.setProperty("pir.stopListFile", "none"); + Inputs.createSchemaFiles(StopListFilter.class.getName()); + + // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier + performEncryption(); + SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath()); + + KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); + loadTestData(producer); + + + logger.info("Test (splitPartitions,saltColumns) = (true,true)"); + SystemConfiguration.setProperty("storm.splitPartitions", "true"); + SystemConfiguration.setProperty("storm.saltColumns", "true"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (true,false)"); + SystemConfiguration.setProperty("storm.splitPartitions", "true"); + SystemConfiguration.setProperty("storm.saltColumns", "false"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (false,true)"); + SystemConfiguration.setProperty("storm.splitPartitions", "false"); + SystemConfiguration.setProperty("storm.saltColumns", "true"); + runTest(); + + logger.info("Test (splitPartitions,saltColumns) = (false,false)"); + SystemConfiguration.setProperty("storm.splitPartitions", "false"); + SystemConfiguration.setProperty("storm.saltColumns", "false"); + runTest(); + } + + private void runTest() throws Exception + { + File responderFile = File.createTempFile("responderFile", ".txt"); + logger.info("Starting topology."); + runTopology(responderFile); + + // decrypt results + logger.info("Decrypting results. " + responderFile.length()); + File fileFinalResults = performDecryption(responderFile); + + // check results + List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults); + BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, Inputs.createJSONDataElements()); + + responderFile.deleteOnExit(); + fileFinalResults.deleteOnExit(); + } + + private void runTopology(File responderFile) throws Exception + { + MkClusterParam mkClusterParam = new MkClusterParam(); + // The test sometimes fails because of timing issues when more than 1 supervisor set. + mkClusterParam.setSupervisors(1); + + // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid worrying about timing. + Config conf = PirkTopology.createStormConf(); + conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath()); + conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString()); + conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap()); + // conf.setDebug(true); + mkClusterParam.setDaemonConf(conf); + + TestJob testJob = createPirkTestJob(conf); + Testing.withLocalCluster(mkClusterParam, testJob); + // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob); + } + + private TestJob createPirkTestJob(final Config config) + { + final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config); + return new TestJob() + { + StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig); + + @Override + public void run(ILocalCluster iLocalCluster) throws Exception + { + iLocalCluster.submitTopology("pirk_integration_test", config, topology); + logger.info("Pausing for setup."); + //Thread.sleep(4000); + //KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig()); + //loadTestData(producer); + //Thread.sleep(10000); + while(OutputBolt.latch.getCount() == testCountDown){ + Thread.sleep(1000); + } + testCountDown -=1; + + logger.info("Finished..."); + } + }; + } + + private SpoutConfig setUpTestKafkaSpout(Config conf) + { + ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString()); + + SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, "/pirk_test_root", "pirk_integr_test_spout"); + kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf)); + logger.info("KafkaConfig initialized..."); + + return kafkaConfig; + } + + private void startZookeeper() throws Exception + { + logger.info("Starting zookeeper."); + zookeeperLocalCluster = new TestingServer(); + zookeeperLocalCluster.start(); + logger.info("Zookeeper initialized."); + + } + + private void startKafka() throws Exception + { + FileUtils.deleteDirectory(new File(kafkaTmpDir)); + + Properties props = new Properties(); + props.setProperty("zookeeper.session.timeout.ms", "100000"); + props.put("advertised.host.name", "localhost"); + props.put("port", 11111); + // props.put("broker.id", "0"); + props.put("log.dir", kafkaTmpDir); + props.put("enable.zookeeper", "true"); + props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString()); + KafkaConfig kafkaConfig = KafkaConfig.fromProps(props); + kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), scala.Option.apply("kafkaThread")); + kafkaLocalBroker.startup(); + + zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 60000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperLocalCluster.getConnectString()), false); + //ZkUtils zkUtils = ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false); + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); + } + + @AfterClass + public static void tearDown() throws Exception + { + zkClient.close(); + kafkaLocalBroker.shutdown(); + zookeeperLocalCluster.stop(); + + FileUtils.deleteDirectory(new File(kafkaTmpDir)); + + fileQuery.delete(); + fileQuerier.delete(); + + } + + private HashMap<String,Object> createKafkaProducerConfig() + { + String kafkaHostName = "localhost"; + Integer kafkaPorts = 11111; + HashMap<String,Object> config = new HashMap<String,Object>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts); + config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + return config; + } + + private void loadTestData(KafkaProducer producer) + { + for (JSONObject dataRecord : Inputs.createJSONDataElements()) + { + logger.info("Sending record to Kafka " + dataRecord.toString()); + producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString())); + } + } + + private void performEncryption() throws Exception + { + // ArrayList<String> selectors = BaseTests.selectorsDomain; + List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net")); + String queryType = Inputs.DNS_HOSTNAME_QUERY; + + Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty); + + nSquared = paillier.getNSquared(); + + queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType, + false, true, false); + + // Perform the encryption + logger.info("Performing encryption of the selectors - forming encrypted query vectors:"); + EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier); + Querier querier = encryptQuery.encrypt(1); + logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:"); + + // Write out files. + fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt"); + fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt"); + + localStore.store(fileQuerier.getAbsolutePath(), querier); + localStore.store(fileQuery, querier.getQuery()); + } + + private File performDecryption(File responseFile) throws Exception + { + File finalResults = File.createTempFile("finalFileResults", ".txt"); + String querierFilePath = fileQuerier.getAbsolutePath(); + String responseFilePath = responseFile.getAbsolutePath(); + String outputFile = finalResults.getAbsolutePath(); + int numThreads = 1; + + Response response = localStore.recall(responseFilePath, Response.class); + Querier querier = localStore.recall(querierFilePath, Querier.class); + + // Perform decryption and output the result file + DecryptResponse decryptResponse = new DecryptResponse(response, querier); + decryptResponse.decrypt(numThreads); + QueryResultsWriter.writeResultFile(outputFile, decryptResponse.decrypt(numThreads)); + return finalResults; + } + +}
