Repository: incubator-pirk Updated Branches: refs/heads/master 43c772c45 -> 98b1b4c36
Narrow ResponderPlugin run method to throw PIRException -- closes apache/incubator-pirk#99 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/98b1b4c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/98b1b4c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/98b1b4c3 Branch: refs/heads/master Commit: 98b1b4c36d5ff10ec86f7e24225c5c42ac6004fa Parents: 43c772c Author: tellison <[email protected]> Authored: Fri Sep 23 18:39:31 2016 -0400 Committer: eawilliams <[email protected]> Committed: Fri Sep 23 18:39:31 2016 -0400 ---------------------------------------------------------------------- .../mapreduce/ComputeResponseTool.java | 3 +- .../wideskies/mapreduce/MapReduceResponder.java | 14 +++- .../wideskies/spark/ComputeResponse.java | 27 ++++--- .../wideskies/spark/SparkResponder.java | 3 +- .../streaming/ComputeStreamingResponse.java | 52 ++++++++----- .../streaming/SparkStreamingResponder.java | 28 +++++-- .../wideskies/spi/ResponderPlugin.java | 4 +- .../responder/wideskies/storm/PirkTopology.java | 19 ++++- .../wideskies/storm/StormResponder.java | 3 +- .../pirk/schema/data/DataSchemaLoader.java | 81 ++++++++++---------- .../pirk/schema/query/QuerySchemaLoader.java | 81 ++++++++++---------- 11 files changed, 189 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java index 80e8a13..21cf518 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/ComputeResponseTool.java @@ -54,6 +54,7 @@ import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.serialization.HadoopFileSystemStore; import org.apache.pirk.utils.FileConst; import org.apache.pirk.utils.HDFS; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.elasticsearch.hadoop.mr.EsInputFormat; import org.slf4j.Logger; @@ -117,7 +118,7 @@ public class ComputeResponseTool extends Configured implements Tool private QueryInfo queryInfo = null; private QuerySchema qSchema = null; - public ComputeResponseTool() throws Exception + public ComputeResponseTool() throws IOException, PIRException { setupParameters(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java index fc1d20b..fcbc88b 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java @@ -20,6 +20,7 @@ package org.apache.pirk.responder.wideskies.mapreduce; import org.apache.hadoop.util.ToolRunner; import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +37,17 @@ public class MapReduceResponder implements ResponderPlugin } @Override - public void run() throws Exception + public void run() throws PIRException { logger.info("Launching MapReduce ResponderTool:"); - ComputeResponseTool pirWLTool = new ComputeResponseTool(); - ToolRunner.run(pirWLTool, new String[] {}); + try + { + ComputeResponseTool pirWLTool = new ComputeResponseTool(); + ToolRunner.run(pirWLTool, new String[] {}); + } catch (Exception e) + { + // An exception occurred invoking the tool, don't know how to recover. + throw new PIRException(e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index d6593f7..00bc5c1 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -99,7 +99,7 @@ public class ComputeResponse private boolean colMultReduceByKey = false; - public ComputeResponse(FileSystem fileSys) throws Exception + public ComputeResponse(FileSystem fileSys) throws PIRException { fs = fileSys; storage = new HadoopFileSystemStore(fs); @@ -156,12 +156,18 @@ public class ComputeResponse // Setup, run query, teardown logger.info("Setting up for query run"); - setup(); + try + { + setup(); + } catch (IOException e) + { + throw new PIRException("An error occurred setting up the Spark responder.", e); + } logger.info("Setup complete"); } // Setup for the accumulators and broadcast variables - private void setup() throws Exception + private void setup() throws IOException, PIRException { // Load the schemas DataSchemaLoader.initialize(true, fs); @@ -219,7 +225,7 @@ public class ComputeResponse /** * Method to read in data from an allowed input source/format and perform the query */ - public void performQuery() throws Exception + public void performQuery() throws IOException, PIRException { logger.info("Performing query: "); @@ -243,7 +249,7 @@ public class ComputeResponse * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements */ @SuppressWarnings("unchecked") - public JavaRDD<MapWritable> readData() throws Exception + public JavaRDD<MapWritable> readData() throws IOException, PIRException { logger.info("Reading data "); @@ -268,10 +274,13 @@ public class ComputeResponse // Set the inputFormatClass based upon the baseInputFormat property String classString = SystemConfiguration.getProperty("pir.baseInputFormat"); - Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString); - if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass)) + Class<? extends BaseInputFormat<Text,MapWritable>> inputClass; + try + { + inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString); + } catch (ClassNotFoundException | ClassCastException e) { - throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat"); + throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e); } job.setInputFormatClass(inputClass); @@ -296,7 +305,7 @@ public class ComputeResponse * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements */ @SuppressWarnings("unchecked") - public JavaRDD<MapWritable> readDataES() throws Exception + public JavaRDD<MapWritable> readDataES() throws IOException, PIRException { logger.info("Reading data "); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java index bd05236..fce905d 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public class SparkResponder implements ResponderPlugin } @Override - public void run() throws Exception + public void run() throws PIRException { logger.info("Launching Spark ComputeResponse:"); try http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java index b91cc68..c291df0 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java @@ -43,10 +43,12 @@ import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData; import org.apache.pirk.schema.data.DataSchema; import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.query.QuerySchema; import org.apache.pirk.schema.query.QuerySchemaLoader; import org.apache.pirk.schema.query.QuerySchemaRegistry; import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -106,7 +108,7 @@ public class ComputeStreamingResponse private boolean colMultReduceByKey = false; - public ComputeStreamingResponse(FileSystem fileSys) throws Exception + public ComputeStreamingResponse(FileSystem fileSys) throws PIRException { fs = fileSys; storage = new HadoopFileSystemStore(fs); @@ -171,12 +173,18 @@ public class ComputeStreamingResponse // Setup, run query, teardown logger.info("Setting up for query run"); - setup(); + try + { + setup(); + } catch (IOException e) + { + throw new PIRException("An error occurred setting up the streaming responder.", e); + } logger.info("Setup complete"); } // Setup for the accumulators and broadcast variables - private void setup() throws Exception + private void setup() throws IOException, PIRException { // Load the schemas DataSchemaLoader.initialize(true, fs); @@ -191,11 +199,6 @@ public class ComputeStreamingResponse QueryInfo queryInfo = query.getQueryInfo(); bVars.setQuery(query); bVars.setQueryInfo(queryInfo); - - if(query == null) - { - logger.info("query is null for queryInput = " + queryInput); - } if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false)) { @@ -235,15 +238,20 @@ public class ComputeStreamingResponse /** * Method to start the computation - * - * @throws InterruptedException */ - public void start() throws InterruptedException + public void start() { logger.info("Starting computation..."); jssc.start(); - jssc.awaitTermination(); + try + { + jssc.awaitTermination(); + } catch (InterruptedException e) + { + // Interrupted while waiting for termination + Thread.interrupted(); + } } /** @@ -259,7 +267,7 @@ public class ComputeStreamingResponse /** * Method to read in data from an allowed input source/format and perform the query */ - public void performQuery() throws IOException, ClassNotFoundException, InterruptedException + public void performQuery() throws IOException, PIRException { logger.info("Performing query: "); @@ -272,6 +280,9 @@ public class ComputeStreamingResponse { inputRDD = readDataES(); } + else { + throw new PIRException("Unknown data input format " + dataInputFormat); + } performQuery(inputRDD); } @@ -280,7 +291,7 @@ public class ComputeStreamingResponse * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements */ @SuppressWarnings("unchecked") - public JavaDStream<MapWritable> readData() throws ClassNotFoundException, IOException + public JavaDStream<MapWritable> readData() throws IOException, PIRException { logger.info("Reading data "); @@ -296,10 +307,13 @@ public class ComputeStreamingResponse // Set the inputFormatClass based upon the baseInputFormat property String classString = SystemConfiguration.getProperty("pir.baseInputFormat"); - Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString); - if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass)) + Class<? extends BaseInputFormat<Text,MapWritable>> inputClass; + try + { + inputClass = (Class<? extends BaseInputFormat<Text,MapWritable>>) Class.forName(classString); + } catch (ClassNotFoundException | ClassCastException e) { - throw new ClassCastException("baseInputFormat class = " + classString + " does not extend BaseInputFormat"); + throw new PIRException(classString + " cannot be instantiated or does not extend BaseInputFormat", e); } job.setInputFormatClass(inputClass); @@ -397,10 +411,8 @@ public class ComputeStreamingResponse /** * Method to perform the query given an input JavaDStream of JSON * - * @throws InterruptedException - * */ - public void performQuery(JavaDStream<MapWritable> input) throws InterruptedException + public void performQuery(JavaDStream<MapWritable> input) { logger.info("Performing query: "); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java index 295a3cf..4ce0571 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java @@ -18,15 +18,16 @@ */ package org.apache.pirk.responder.wideskies.spark.streaming; +import java.io.IOException; +import java.security.Permission; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.pirk.responder.wideskies.ResponderDriver; import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.security.Permission; - /** * Class to launch stand alone responder */ @@ -40,15 +41,25 @@ public class SparkStreamingResponder implements ResponderPlugin } @Override - public void run() throws Exception + public void run() throws PIRException { // For handling System.exit calls from Spark Streaming System.setSecurityManager(new SystemExitManager()); + + FileSystem fileSys; + try + { + fileSys = FileSystem.get(new Configuration()); + } catch (IOException e) + { + throw new PIRException(e); + } + logger.info("Launching Spark ComputeStreamingResponse:"); ComputeStreamingResponse computeSR = null; try { - computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration())); + computeSR = new ComputeStreamingResponse(fileSys); computeSR.performQuery(); } catch (SystemExitException e) @@ -56,6 +67,9 @@ public class SparkStreamingResponder implements ResponderPlugin // If System.exit(0) is not caught from Spark Streaming, // the application will complete with a 'failed' status logger.info("Exited with System.exit(0) from Spark Streaming"); + } catch (IOException e) + { + throw new PIRException(e); } finally { @@ -67,7 +81,9 @@ public class SparkStreamingResponder implements ResponderPlugin // Exception and Security Manager classes used to catch System.exit from Spark Streaming private static class SystemExitException extends SecurityException - {} + { + private static final long serialVersionUID = 1L; + } private static class SystemExitManager extends SecurityManager { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java index 3dade0d..912850a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java @@ -19,6 +19,8 @@ package org.apache.pirk.responder.wideskies.spi; +import org.apache.pirk.utils.PIRException; + /** * Interface which launches a responder * <p> @@ -36,5 +38,5 @@ public interface ResponderPlugin /** * This method launches your framework responder. */ - public void run() throws Exception; + public void run() throws PIRException; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java index e0f83d3..fd23c7e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java @@ -19,11 +19,18 @@ package org.apache.pirk.responder.wideskies.storm; import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.*; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.KafkaSpout; +import org.apache.storm.kafka.SpoutConfig; +import org.apache.storm.kafka.ZkHosts; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.BoltDeclarer; import org.apache.storm.topology.TopologyBuilder; @@ -61,7 +68,7 @@ public class PirkTopology private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput"); private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile"); - public static void runPirkTopology() throws Exception + public static void runPirkTopology() throws PIRException { // Set up Kafka parameters logger.info("Configuring Kafka."); @@ -85,7 +92,13 @@ public class PirkTopology // Run topology logger.info("Submitting Pirk topology to Storm..."); - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology); + try + { + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology); + } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) + { + throw new PIRException(e); + } } // main http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java index 08400ac..b988ccc 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java @@ -20,6 +20,7 @@ package org.apache.pirk.responder.wideskies.storm; import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,7 @@ public class StormResponder implements ResponderPlugin } @Override - public void run() throws Exception + public void run() throws PIRException { logger.info("Launching Storm PirkTopology:"); PirkTopology.runPirkTopology(); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java index 4bac8b7..d1a4797 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java @@ -19,7 +19,6 @@ package org.apache.pirk.schema.data; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -79,14 +78,12 @@ public class DataSchemaLoader static { logger.info("Loading pre-configured data schemas: "); - try { initialize(); - } catch (Exception e) + } catch (PIRException e) { - logger.error("Caught exception: "); - e.printStackTrace(); + logger.error(e.getLocalizedMessage()); } } @@ -94,10 +91,10 @@ public class DataSchemaLoader /** * Initializes the static {@link DataSchemaRegistry} with a list of available data schema names. * - * @throws Exception + * @throws PIRException * - failed to initialize */ - public static void initialize() throws Exception + public static void initialize() throws PIRException { initialize(false, null); } @@ -110,10 +107,10 @@ public class DataSchemaLoader * If true, specifies that the data schema is an hdfs file; if false, that it is a regular file. * @param fs * Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the data schema exists - * @throws Exception - * - failed to initialize + * @throws PIRException + * - failed to initialize the data schemas because they could not be read or are invalid. */ - public static void initialize(boolean hdfs, FileSystem fs) throws Exception + public static void initialize(boolean hdfs, FileSystem fs) throws PIRException { String dataSchemas = SystemConfiguration.getProperty("data.schemas", "none"); if (dataSchemas.equals("none")) @@ -122,41 +119,43 @@ public class DataSchemaLoader } String[] dataSchemaFiles = dataSchemas.split(","); - for (String schemaFile : dataSchemaFiles) + try { - logger.info("Loading schemaFile = " + schemaFile + " hdfs = " + hdfs); - - // Parse and load the schema file into a DataSchema object; place in the schemaMap - DataSchemaLoader loader = new DataSchemaLoader(); - InputStream is = null; - if (hdfs) + for (String schemaFile : dataSchemaFiles) { - logger.info("hdfs: filePath = " + schemaFile); - is = fs.open(fs.makeQualified(new Path(schemaFile))); - } - else - { - try - { - is = new FileInputStream(schemaFile); - logger.info("localFS: inputFile = " + schemaFile); - } catch (FileNotFoundException e) - { - logger.info("localFS: inputFile = " + schemaFile + " not found"); - } + DataSchema dataSchema = readSchemaFile(schemaFile, fs, hdfs); + DataSchemaRegistry.put(dataSchema); } + } catch (IOException e) + { + throw new PIRException("Error reading data schema", e); + } + } - if (is != null) - { - try - { - DataSchema dataSchema = loader.loadSchema(is); - DataSchemaRegistry.put(dataSchema); - } finally - { - is.close(); - } - } + private static DataSchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException + { + logger.info("Loading data schemaFile = " + schemaFile + " hdfs = " + hdfs); + + // Parse and load the schema file into a DataSchema object; place in the schemaMap + DataSchemaLoader loader = new DataSchemaLoader(); + InputStream is; + if (hdfs) + { + logger.info("hdfs: filePath = " + schemaFile); + is = fs.open(fs.makeQualified(new Path(schemaFile))); + } + else + { + logger.info("localFS: inputFile = " + schemaFile); + is = new FileInputStream(schemaFile); + } + + try + { + return loader.loadSchema(is); + } finally + { + is.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/98b1b4c3/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java index 433a303..c1b4139 100644 --- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java +++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java @@ -19,7 +19,6 @@ package org.apache.pirk.schema.query; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.HashSet; @@ -85,10 +84,9 @@ public class QuerySchemaLoader try { initialize(); - } catch (Exception e) + } catch (PIRException e) { - logger.error("Caught exception: "); - e.printStackTrace(); + logger.error(e.getLocalizedMessage()); } } @@ -96,10 +94,10 @@ public class QuerySchemaLoader /** * Initializes the static {@link QuerySchemaRegistry} with a list of query schema names. * - * @throws Exception + * @throws PIRException * - failed to initialize */ - public static void initialize() throws Exception + public static void initialize() throws PIRException { initialize(false, null); } @@ -112,52 +110,55 @@ public class QuerySchemaLoader * If true, specifies that the query schema is an hdfs file; if false, that it is a regular file. * @param fs * Used only when {@code hdfs} is true; the {@link FileSystem} handle for the hdfs in which the query schema exists - * @throws Exception - * - failed to initialize + * @throws PIRException + * - failed to initialize the query schemas because they could not be read or are invalid. */ - public static void initialize(boolean hdfs, FileSystem fs) throws Exception + public static void initialize(boolean hdfs, FileSystem fs) throws PIRException { String querySchemas = SystemConfiguration.getProperty("query.schemas", "none"); if (querySchemas.equals("none")) { return; } + String[] querySchemaFiles = querySchemas.split(","); - for (String schemaFile : querySchemaFiles) + try { - logger.info("Loading schemaFile = " + schemaFile); - - // Parse and load the schema file into a QuerySchema object; place in the schemaMap - QuerySchemaLoader loader = new QuerySchemaLoader(); - InputStream is = null; - if (hdfs) + for (String schemaFile : querySchemaFiles) { - is = fs.open(new Path(schemaFile)); - logger.info("hdfs: filePath = " + schemaFile); - } - else - { - try - { - is = new FileInputStream(schemaFile); - logger.info("localFS: inputFile = " + schemaFile); - } catch (FileNotFoundException e) - { - logger.info("localFS: inputFile = " + schemaFile + " not found"); - } + QuerySchema querySchema = readSchemaFile(schemaFile, fs, hdfs); + QuerySchemaRegistry.put(querySchema); } + } catch (IOException e) + { + throw new PIRException("Error reading query schema", e); + } + } - if (is != null) - { - try - { - QuerySchema querySchema = loader.loadSchema(is); - QuerySchemaRegistry.put(querySchema); - } finally - { - is.close(); - } - } + private static QuerySchema readSchemaFile(String schemaFile, FileSystem fs, boolean hdfs) throws IOException, PIRException + { + logger.info("Loading query schemaFile = " + schemaFile); + + // Parse and load the schema file into a QuerySchema object. + QuerySchemaLoader loader = new QuerySchemaLoader(); + InputStream is; + if (hdfs) + { + logger.info("hdfs: filePath = " + schemaFile); + is = fs.open(new Path(schemaFile)); + } + else + { + logger.info("localFS: inputFile = " + schemaFile); + is = new FileInputStream(schemaFile); + } + + try + { + return loader.loadSchema(is); + } finally + { + is.close(); } }
