Repository: incubator-pirk Updated Branches: refs/heads/master ef8d1c1a5 -> 442b21790
PIRK-25 Serialization and storage changes to Querier, Query, and Response - closes apache/incubator-pirk#18 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/442b2179 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/442b2179 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/442b2179 Branch: refs/heads/master Commit: 442b21790d16bf185b6bd8b56d649ae79b35ade0 Parents: ef8d1c1 Author: tellison <[email protected]> Authored: Sat Jul 23 19:34:45 2016 -0400 Committer: eawilliams <[email protected]> Committed: Sat Jul 23 19:34:45 2016 -0400 ---------------------------------------------------------------------- .../apache/pirk/querier/wideskies/Querier.java | 91 +----------- .../pirk/querier/wideskies/QuerierDriver.java | 9 +- .../querier/wideskies/encrypt/EncryptQuery.java | 34 ----- .../org/apache/pirk/query/wideskies/Query.java | 148 +------------------ .../responder/wideskies/ResponderDriver.java | 3 +- .../wideskies/mapreduce/ColumnMultReducer.java | 3 +- .../mapreduce/ComputeResponseTool.java | 5 +- .../wideskies/mapreduce/ExpTableMapper.java | 5 +- .../mapreduce/FinalResponseReducer.java | 9 +- .../HashSelectorsAndPartitionDataMapper.java | 4 +- .../wideskies/mapreduce/RowCalcReducer.java | 3 +- .../wideskies/spark/ComputeExpLookupTable.java | 10 +- .../wideskies/spark/ComputeResponse.java | 16 +- .../wideskies/standalone/Responder.java | 3 +- .../pirk/response/wideskies/Response.java | 134 +---------------- .../serialization/HadoopFileSystemStore.java | 94 ++++++++++++ .../pirk/serialization/JavaSerializer.java | 49 ++++++ .../pirk/serialization/JsonSerializer.java | 41 +++++ .../serialization/LocalFileSystemStore.java | 82 ++++++++++ .../serialization/SerializationService.java | 49 ++++++ .../org/apache/pirk/serialization/Storable.java | 25 ++++ .../pirk/serialization/StorageService.java | 39 +++++ .../distributed/testsuite/DistTestSuite.java | 5 +- .../apache/pirk/test/utils/StandaloneQuery.java | 13 +- 24 files changed, 448 insertions(+), 426 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/Querier.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java index 7ffc7a0..4d6523d 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/Querier.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/Querier.java @@ -1,4 +1,4 @@ -/* +/******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,15 +15,9 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - */ + *******************************************************************************/ package org.apache.pirk.querier.wideskies; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -31,6 +25,7 @@ import java.util.HashMap; import org.apache.pirk.encryption.Paillier; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.serialization.Storable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +33,7 @@ import org.slf4j.LoggerFactory; * Class to hold the information necessary for the PIR querier to perform decryption * */ -public class Querier implements Serializable +public class Querier implements Serializable, Storable { private static final long serialVersionUID = 1L; @@ -95,82 +90,4 @@ public class Querier implements Serializable { return embedSelectorMap; } - - /** - * Method to serialize the Querier object to a file - */ - public void writeToFile(String filename) throws IOException - { - writeToFile(new File(filename)); - } - - /** - * Method to serialize the Querier object to a file - */ - public void writeToFile(File file) throws IOException - { - ObjectOutputStream oos = null; - FileOutputStream fout = null; - try - { - fout = new FileOutputStream(file, true); - oos = new ObjectOutputStream(fout); - oos.writeObject(this); - } catch (Exception ex) - { - ex.printStackTrace(); - } finally - { - if (oos != null) - { - oos.close(); - } - if (fout != null) - { - fout.close(); - } - } - } - - /** - * Reconstruct the Querier object from its file serialization - */ - public static Querier readFromFile(String filename) throws IOException - { - - return readFromFile(new File(filename)); - } - - /** - * Reconstruct the Querier object from its file serialization - */ - public static Querier readFromFile(File file) throws IOException - { - Querier querier = null; - - FileInputStream fIn = null; - ObjectInputStream oIn; - try - { - fIn = new FileInputStream(file); - oIn = new ObjectInputStream(fIn); - querier = (Querier) oIn.readObject(); - } catch (IOException | ClassNotFoundException e) - { - e.printStackTrace(); - } finally - { - if (fIn != null) - { - try - { - fIn.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - return querier; - } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java index 01a6c86..8f287fd 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java @@ -29,6 +29,7 @@ import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.query.LoadQuerySchemas; +import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.FileIOUtils; import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; @@ -82,6 +83,7 @@ public class QuerierDriver implements Serializable String outputFile; String queryType = null; int numThreads; + LocalFileSystemStore storage = new LocalFileSystemStore(); // Encryption variables int hashBitSize = 0; @@ -182,14 +184,15 @@ public class QuerierDriver implements Serializable // Write necessary output files - two files written - // (1) Querier object to <outputFile>-QuerierConst.QUERIER_FILETAG // (2) Query object to <outputFile>-QuerierConst.QUERY_FILETAG - encryptQuery.writeOutputFiles(outputFile); + storage.store(outputFile + "-" + QuerierConst.QUERIER_FILETAG, encryptQuery.getQuerier()); + storage.store(outputFile + "-" + QuerierConst.QUERY_FILETAG, encryptQuery.getQuery()); } else // Decryption { // Reconstruct the necessary objects from the files - Response response = Response.readFromFile(inputFile); - Querier querier = Querier.readFromFile(querierFile); + Response response = storage.recall(inputFile, Response.class); + Querier querier = storage.recall(querierFile, Querier.class); // Perform decryption and output the result file DecryptResponse decryptResponse = new DecryptResponse(response, querier); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java index 87ee9d9..a277c46 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/encrypt/EncryptQuery.java @@ -258,38 +258,4 @@ public class EncryptQuery } logger.info("Completed creation of encrypted query vectors"); } - - /** - * Creates two output files - two files written: - * <p> - * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG - * <p> - * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG - */ - public void writeOutputFiles(String filePrefix) throws IOException - { - // Write the Querier object - querier.writeToFile(filePrefix + "-" + QuerierConst.QUERIER_FILETAG); - - // Write the Query object - query.writeToFile(filePrefix + "-" + QuerierConst.QUERY_FILETAG); - } - - /** - * Creates two output files - two files written: - * <p> - * (1) Querier object to <filePrefix>-QuerierConst.QUERIER_FILETAG - * <p> - * (2) Query object to <filePrefix>-QuerierConst.QUERY_FILETAG - * <p> - * This method is used for functional testing - */ - public void writeOutputFiles(File fileQuerier, File fileQuery) throws IOException - { - // Write the Querier object - querier.writeToFile(fileQuerier); - - // Write the Query object - query.writeToFile(fileQuery); - } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/query/wideskies/Query.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java index ebaafbb..2035d4b 100644 --- a/src/main/java/org/apache/pirk/query/wideskies/Query.java +++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java @@ -18,12 +18,6 @@ */ package org.apache.pirk.query.wideskies; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; @@ -33,10 +27,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.querier.wideskies.encrypt.ExpTableRunnable; +import org.apache.pirk.serialization.Storable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +37,7 @@ import org.slf4j.LoggerFactory; * Class to hold the PIR query vectors * */ -public class Query implements Serializable +public class Query implements Serializable, Storable { private static final long serialVersionUID = 1L; @@ -225,141 +218,4 @@ public class Query implements Serializable { return expTable.get(value).get(power); } - - public void writeToFile(String filename) throws IOException - { - writeToFile(new File(filename)); - } - - public void writeToFile(File file) throws IOException - { - ObjectOutputStream oos = null; - FileOutputStream fout = null; - try - { - fout = new FileOutputStream(file, true); - oos = new ObjectOutputStream(fout); - oos.writeObject(this); - } catch (Exception ex) - { - ex.printStackTrace(); - } finally - { - if (oos != null) - { - oos.close(); - } - if (fout != null) - { - fout.close(); - } - } - } - - /** - * Reconstruct the Query object from its file serialization - */ - public static Query readFromFile(String filename) throws IOException - { - - return readFromFile(new File(filename)); - } - - /** - * Reconstruct the Query object from its file serialization - */ - public static Query readFromFile(File file) throws IOException - { - Query query = null; - - FileInputStream fIn = null; - ObjectInputStream oIn; - try - { - fIn = new FileInputStream(file); - oIn = new ObjectInputStream(fIn); - query = (Query) oIn.readObject(); - } catch (IOException | ClassNotFoundException e) - { - e.printStackTrace(); - } finally - { - if (fIn != null) - { - try - { - fIn.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - return query; - } - - /** - * Method to write the Query object to a file in HDFS - * - */ - public void writeToHDFSFile(Path fileName, FileSystem fs) - { - - ObjectOutputStream oos = null; - try - { - oos = new ObjectOutputStream(fs.create(fileName)); - oos.writeObject(this); - oos.close(); - } catch (IOException e) - { - e.printStackTrace(); - } finally - { - if (oos != null) - { - try - { - oos.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - } - - /** - * Method to reconstruct the Query object from its file serialization in HDFS - */ - public static Query readFromHDFSFile(Path filename, FileSystem fs) - { - Query pirWLQuery = null; - - ObjectInputStream ois = null; - try - { - ois = new ObjectInputStream(fs.open(filename)); - pirWLQuery = (Query) ois.readObject(); - ois.close(); - - } catch (IOException | ClassNotFoundException e1) - { - e1.printStackTrace(); - } finally - { - if (ois != null) - { - try - { - ois.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - - return pirWLQuery; - } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index 4cd6b5f..61dbb23 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -25,6 +25,7 @@ import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; import org.apache.pirk.responder.wideskies.spark.ComputeResponse; import org.apache.pirk.responder.wideskies.standalone.Responder; +import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.SystemConfiguration; /** @@ -65,7 +66,7 @@ public class ResponderDriver System.out.println("Launching Standalone Responder:"); String queryInput = SystemConfiguration.getProperty("pir.queryInput"); - Query query = Query.readFromFile(queryInput); + Query query = new LocalFileSystemStore().recall(queryInput, Query.class); Responder pirResponder = new Responder(query); pirResponder.computeStandaloneResponse(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java index abffadf..df3b7d0 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ColumnMultReducer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.FileConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ public class ColumnMultReducer extends Reducer<LongWritable,Text,LongWritable,Te FileSystem fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - query = Query.readFromHDFSFile(new Path(queryDir), fs); + query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class); } @Override http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index fb3027b..6eab9fe 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -49,6 +49,7 @@ import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.HDFS; import org.apache.pirk.utils.SystemConfiguration; @@ -127,7 +128,7 @@ public class ComputeResponseTool extends Configured implements Tool LoadDataSchemas.initialize(true, fs); LoadQuerySchemas.initialize(true, fs); - query = Query.readFromHDFSFile(new Path(queryInputDir), fs); + query = new HadoopFileSystemStore(fs).recall(queryInputDir, Query.class); queryInfo = query.getQueryInfo(); qSchema = LoadQuerySchemas.getSchema(queryInfo.getQueryType()); @@ -331,7 +332,7 @@ public class ComputeResponseTool extends Configured implements Tool // Place exp table in query object query.setExpFileBasedLookup(expFileTable); - query.writeToHDFSFile(new Path(queryInputDir), fs); + new HadoopFileSystemStore(fs).store(queryInputDir, query); logger.info("Completed creation of expTable"); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java index 28d49a3..c53acdc 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ExpTableMapper.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.math.BigInteger; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.pirk.encryption.ModPowAbstraction; import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,9 +52,8 @@ public class ExpTableMapper extends Mapper<LongWritable,Text,Text,Text> valueOut = new Text(); - FileSystem fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - query = Query.readFromHDFSFile(new Path(queryDir), fs); + query = new HadoopFileSystemStore(FileSystem.newInstance(ctx.getConfiguration())).recall(queryDir, Query.class); int dataPartitionBitSize = query.getQueryInfo().getDataPartitionBitSize(); maxValue = (int) Math.pow(2, dataPartitionBitSize) - 1; http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java index 1df7b0e..8f7cbe8 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/FinalResponseReducer.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.math.BigInteger; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; @@ -30,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +46,8 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable private Response response = null; private String outputFile = null; private FileSystem fs = null; + private HadoopFileSystemStore storage = null; + private QueryInfo queryInfo = null; @Override public void setup(Context ctx) throws IOException, InterruptedException @@ -56,8 +58,9 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable mos = new MultipleOutputs<>(ctx); fs = FileSystem.newInstance(ctx.getConfiguration()); + storage = new HadoopFileSystemStore(fs); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - Query query = Query.readFromHDFSFile(new Path(queryDir), fs); + Query query = storage.recall(queryDir, Query.class); QueryInfo queryInfo = query.getQueryInfo(); outputFile = ctx.getConfiguration().get("pirMR.outputFile"); @@ -83,7 +86,7 @@ public class FinalResponseReducer extends Reducer<LongWritable,Text,LongWritable @Override public void cleanup(Context ctx) throws IOException, InterruptedException { - response.writeToHDFSFile(new Path(outputFile), fs); + storage.store(outputFile, response); mos.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java index 95396a9..b04babd 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java @@ -36,6 +36,7 @@ import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.filter.DataFilter; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.StringUtils; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; @@ -57,6 +58,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable HashSet<String> stopList = null; + private Query query = null; private QueryInfo queryInfo = null; private QuerySchema qSchema = null; private DataSchema dSchema = null; @@ -75,7 +77,7 @@ public class HashSelectorsAndPartitionDataMapper extends Mapper<Text,MapWritable // Can make this so that it reads multiple queries at one time... String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - Query query = Query.readFromHDFSFile(new Path(queryDir), fs); + query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class); queryInfo = query.getQueryInfo(); try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java index e35ee84..ea57d2d 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/RowCalcReducer.java @@ -37,6 +37,7 @@ import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; @@ -79,7 +80,7 @@ public class RowCalcReducer extends Reducer<IntWritable,BytesArrayWritable,LongW fs = FileSystem.newInstance(ctx.getConfiguration()); String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir"); - query = Query.readFromHDFSFile(new Path(queryDir), fs); + query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class); queryInfo = query.getQueryInfo(); try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java index 938c32e..2feeca8 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeExpLookupTable.java @@ -28,12 +28,14 @@ import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.Tuple2; /** @@ -100,7 +102,13 @@ public class ComputeExpLookupTable // Place exp table in query object and in the BroadcastVars Map<Integer,String> queryHashFileNameMap = hashToPartition.collectAsMap(); query.setExpFileBasedLookup(new HashMap<>(queryHashFileNameMap)); - query.writeToHDFSFile(new Path(queryInputFile), fs); + try + { + new HadoopFileSystemStore(fs).store(queryInputFile, query); + } catch (IOException e) + { + e.printStackTrace(); + } bVars.setQuery(query); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index ba7fd12..c6b0d28 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -18,12 +18,12 @@ */ package org.apache.pirk.responder.wideskies.spark; +import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Map; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -36,6 +36,7 @@ import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.data.LoadDataSchemas; import org.apache.pirk.schema.query.LoadQuerySchemas; import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.SparkConf; @@ -45,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.hadoop.mr.EsInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.Tuple2; /** @@ -77,6 +79,7 @@ public class ComputeResponse private boolean useModExpJoin = false; private FileSystem fs = null; + private HadoopFileSystemStore storage = null; private JavaSparkContext sc = null; private Accumulators accum = null; @@ -93,6 +96,7 @@ public class ComputeResponse public ComputeResponse(FileSystem fileSys) throws Exception { fs = fileSys; + storage = new HadoopFileSystemStore(fs); dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat)) @@ -162,7 +166,7 @@ public class ComputeResponse bVars = new BroadcastVars(sc); // Set the Query and QueryInfo broadcast variables - query = Query.readFromHDFSFile(new Path(queryInput), fs); + query = storage.recall(queryInput, Query.class); queryInfo = query.getQueryInfo(); bVars.setQuery(query); bVars.setQueryInfo(queryInfo); @@ -366,7 +370,13 @@ public class ComputeResponse logger.debug("colNum = " + colVal + " column = " + encColResults.get(colVal).toString()); } - response.writeToHDFSFile(new Path(outputFile), fs); + try + { + storage.store(outputFile, response); + } catch (IOException e) + { + throw new RuntimeException(e); + } accum.printAll(); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java index 879b618..4ac3923 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/Responder.java @@ -30,6 +30,7 @@ import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.query.wideskies.QueryInfo; import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.KeyedHash; import org.apache.pirk.utils.SystemConfiguration; import org.json.simple.JSONObject; @@ -126,7 +127,7 @@ public class Responder // Set the response object, extract, write to file String outputFile = SystemConfiguration.getProperty("pir.outputFile"); setResponseElements(); - response.writeToFile(outputFile); + new LocalFileSystemStore().store(outputFile, response); } /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/response/wideskies/Response.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java index 3d2a3c0..667b8a3 100644 --- a/src/main/java/org/apache/pirk/response/wideskies/Response.java +++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java @@ -18,19 +18,12 @@ */ package org.apache.pirk.response.wideskies; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.math.BigInteger; import java.util.TreeMap; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.serialization.Storable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +33,7 @@ import org.slf4j.LoggerFactory; * Serialized and returned to the querier for decryption * */ -public class Response implements Serializable +public class Response implements Serializable, Storable { private static final long serialVersionUID = 1L; @@ -76,127 +69,4 @@ public class Response implements Serializable { responseElements.put(position, element); } - - public void writeToFile(String filename) throws IOException - { - writeToFile(new File(filename)); - } - - public void writeToFile(File file) throws IOException - { - ObjectOutputStream oos = null; - FileOutputStream fout = null; - try - { - fout = new FileOutputStream(file, true); - oos = new ObjectOutputStream(fout); - oos.writeObject(this); - } catch (Exception ex) - { - ex.printStackTrace(); - } finally - { - if (oos != null) - { - oos.close(); - } - if (fout != null) - { - fout.close(); - } - } - } - - public void writeToHDFSFile(Path fileName, FileSystem fs) - { - - ObjectOutputStream oos = null; - try - { - oos = new ObjectOutputStream(fs.create(fileName)); - oos.writeObject(this); - oos.close(); - } catch (IOException e) - { - e.printStackTrace(); - } finally - { - if (oos != null) - { - try - { - oos.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - } - - public static Response readFromFile(String filename) throws IOException - { - return readFromFile(new File(filename)); - } - - public static Response readFromFile(File file) throws IOException - { - Response response = null; - - ObjectInputStream objectinputstream = null; - FileInputStream streamIn = null; - try - { - streamIn = new FileInputStream(file); - objectinputstream = new ObjectInputStream(streamIn); - response = (Response) objectinputstream.readObject(); - - } catch (Exception e) - { - e.printStackTrace(); - } finally - { - if (objectinputstream != null) - { - objectinputstream.close(); - } - if (streamIn != null) - { - streamIn.close(); - } - } - - return response; - } - - // Used for testing - public static Response readFromHDFSFile(Path file, FileSystem fs) throws IOException - { - Response response = null; - - ObjectInputStream ois = null; - try - { - ois = new ObjectInputStream(fs.open(file)); - response = (Response) ois.readObject(); - ois.close(); - - } catch (IOException | ClassNotFoundException e1) - { - e1.printStackTrace(); - } finally - { - if (ois != null) - { - try - { - ois.close(); - } catch (IOException e) - { - e.printStackTrace(); - } - } - } - return response; - } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java new file mode 100644 index 0000000..7e1e475 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/HadoopFileSystemStore.java @@ -0,0 +1,94 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HadoopFileSystemStore extends StorageService +{ + + private FileSystem hadoopFileSystem; + + // Prevents others from using default constructor. + HadoopFileSystemStore() + { + super(); + } + + /** + * Creates a new storage service on the given HDFS file system using default Java serialization. + */ + public HadoopFileSystemStore(FileSystem fs) + { + super(); + hadoopFileSystem = fs; + } + + public HadoopFileSystemStore(FileSystem fs, SerializationService serial) + { + super(serial); + hadoopFileSystem = fs; + } + + public void store(String pathName, Storable value) throws IOException + { + store(new Path(pathName), value); + } + + public void store(Path path, Storable obj) throws IOException + { + OutputStream os = hadoopFileSystem.create(path); + try + { + serializer.write(os, obj); + } finally + { + if (os != null) + { + os.close(); + } + } + } + + public <T> T recall(String pathName, Class<T> type) throws IOException + { + return recall(new Path(pathName), type); + } + + public <T> T recall(Path path, Class<T> type) throws IOException + { + InputStream is = hadoopFileSystem.open(path); + try + { + return serializer.read(is, type); + } finally + { + if (is != null) + { + is.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/JavaSerializer.java b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java new file mode 100644 index 0000000..4228c19 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/JavaSerializer.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; + +public class JavaSerializer extends SerializationService +{ + + public void write(OutputStream stream, Storable obj) throws IOException + { + ObjectOutputStream oos = new ObjectOutputStream(stream); + oos.writeObject(obj); + } + + @SuppressWarnings("unchecked") + public <T> T read(InputStream stream, Class<T> type) throws IOException + { + ObjectInputStream oin = new ObjectInputStream(stream); + try + { + return (T) oin.readObject(); + } catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/JsonSerializer.java b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java new file mode 100644 index 0000000..c33366d --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/JsonSerializer.java @@ -0,0 +1,41 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +//TODO: Waiting for Jackson adoption +public class JsonSerializer extends SerializationService +{ + + @Override + public void write(OutputStream w, Storable obj) throws IOException + { + throw new RuntimeException("Not yet implemented"); + } + + @Override + public <T> T read(InputStream stream, Class<T> type) throws IOException + { + throw new RuntimeException("Not yet implemented"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java new file mode 100644 index 0000000..50d11c3 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/LocalFileSystemStore.java @@ -0,0 +1,82 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +public class LocalFileSystemStore extends StorageService +{ + + /** + * Creates a new storage service on the local file system using default Java serialization. + */ + public LocalFileSystemStore() + { + super(); + } + + public LocalFileSystemStore(SerializationService serial) + { + super(serial); + } + + public void store(String path, Storable obj) throws IOException + { + store(new File(path), obj); + } + + public void store(File file, Storable obj) throws IOException + { + FileOutputStream fos = new FileOutputStream(file); + try + { + serializer.write(fos, obj); + } finally + { + if (fos != null) + { + fos.close(); + } + } + } + + public <T> T recall(String path, Class<T> type) throws IOException + { + return recall(new File(path), type); + } + + public <T> T recall(File file, Class<T> type) throws IOException + { + FileInputStream fis = new FileInputStream(file); + try + { + return serializer.read(fis, type); + } finally + { + if (fis != null) + { + fis.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/SerializationService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/SerializationService.java b/src/main/java/org/apache/pirk/serialization/SerializationService.java new file mode 100644 index 0000000..2764fc8 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/SerializationService.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/* + * Ability to read and write objects to/from a stream. + */ +public abstract class SerializationService +{ + public abstract <T> T read(InputStream stream, Class<T> type) throws IOException; + + public abstract void write(OutputStream w, Storable obj) throws IOException; + + public byte[] toBytes(Storable obj) + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + try + { + write(bos, obj); + } catch (IOException e) + { + throw new RuntimeException(e); + } + + return bos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/Storable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/Storable.java b/src/main/java/org/apache/pirk/serialization/Storable.java new file mode 100644 index 0000000..d9e2fb3 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/Storable.java @@ -0,0 +1,25 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +// Marker interface +public interface Storable +{ + +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/serialization/StorageService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/serialization/StorageService.java b/src/main/java/org/apache/pirk/serialization/StorageService.java new file mode 100644 index 0000000..775a313 --- /dev/null +++ b/src/main/java/org/apache/pirk/serialization/StorageService.java @@ -0,0 +1,39 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *******************************************************************************/ +package org.apache.pirk.serialization; + +abstract class StorageService +{ + SerializationService serializer; + + StorageService() + { + this.setSerializer(new JavaSerializer()); + } + + StorageService(SerializationService service) + { + this.setSerializer(service); + } + + public void setSerializer(SerializationService service) + { + serializer = service; + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java index 37cb43c..020d464 100644 --- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -36,6 +36,7 @@ import org.apache.pirk.responder.wideskies.ResponderCLI; import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.test.distributed.DistributedTestDriver; import org.apache.pirk.test.utils.BaseTests; import org.apache.pirk.test.utils.Inputs; @@ -342,7 +343,7 @@ public class DistTestSuite // Write the Querier object to a file Path queryInputDirPath = new Path(queryInputDir); - query.writeToHDFSFile(queryInputDirPath, fs); + new HadoopFileSystemStore(fs).store(queryInputDirPath, query); fs.deleteOnExit(queryInputDirPath); // Grab the original data and query schema properties to reset upon completion @@ -413,7 +414,7 @@ public class DistTestSuite // Perform decryption // Reconstruct the necessary objects from the files logger.info("Performing decryption; writing final results file"); - Response response = Response.readFromHDFSFile(new Path(outputFile), fs); + Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class); // Perform decryption and output the result file DecryptResponse decryptResponse = new DecryptResponse(response, querier); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/442b2179/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java index aeda7dc..c33971e 100644 --- a/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java +++ b/src/main/java/org/apache/pirk/test/utils/StandaloneQuery.java @@ -34,6 +34,7 @@ import org.apache.pirk.query.wideskies.QueryUtils; import org.apache.pirk.responder.wideskies.standalone.Responder; import org.apache.pirk.response.wideskies.Response; import org.apache.pirk.schema.response.QueryResponseJSON; +import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.json.simple.JSONObject; @@ -60,6 +61,7 @@ public class StandaloneQuery ArrayList<QueryResponseJSON> results = null; // Create the necessary files + LocalFileSystemStore storage = new LocalFileSystemStore(); String querySideOuputFilePrefix = "querySideOut"; File fileQuerier = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERIER_FILETAG, ".txt"); File fileQuery = File.createTempFile(querySideOuputFilePrefix + "-" + QuerierConst.QUERY_FILETAG, ".txt"); @@ -98,11 +100,12 @@ public class StandaloneQuery } // Write necessary output files - encryptQuery.writeOutputFiles(fileQuerier, fileQuery); + storage.store(fileQuerier, encryptQuery.getQuerier()); + storage.store(fileQuery, encryptQuery.getQuery()); // Perform the PIR query and build the response elements logger.info("Performing the PIR Query and constructing the response elements:"); - Query query = Query.readFromFile(fileQuery); + Query query = storage.recall(fileQuery, Query.class); Responder pirResponder = new Responder(query); logger.info("Query and Responder elements constructed"); for (JSONObject jsonData : dataElements) @@ -123,14 +126,14 @@ public class StandaloneQuery logger.info("Forming response from response elements; writing to a file"); pirResponder.setResponseElements(); Response responseOut = pirResponder.getResponse(); - responseOut.writeToFile(fileResponse); + storage.store(fileResponse, responseOut); logger.info("Completed forming response from response elements and writing to a file"); // Perform decryption // Reconstruct the necessary objects from the files logger.info("Performing decryption; writing final results file"); - Response responseIn = Response.readFromFile(fileResponse); - Querier querier = Querier.readFromFile(fileQuerier); + Response responseIn = storage.recall(fileResponse, Response.class); + Querier querier = storage.recall(fileQuerier, Querier.class); // Perform decryption and output the result file DecryptResponse decryptResponse = new DecryptResponse(responseIn, querier);
