Repository: incubator-pirk Updated Branches: refs/heads/master bf003cb06 -> 43c772c45
[PIRK-63]- Generalize ResponderDriver to use a RespondLauncher class. This closes apache/incubator-pirk#93 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/43c772c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/43c772c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/43c772c4 Branch: refs/heads/master Commit: 43c772c45860393cb24e672dd7efd78e28d0de6f Parents: bf003cb Author: Darin Johnson <[email protected]> Authored: Fri Sep 23 10:50:06 2016 +0100 Committer: Tim Ellison <[email protected]> Committed: Fri Sep 23 10:50:06 2016 +0100 ---------------------------------------------------------------------- .../pirk/responder/wideskies/ResponderCLI.java | 6 +- .../responder/wideskies/ResponderDriver.java | 116 +++---------------- .../responder/wideskies/ResponderProps.java | 10 +- .../responder/wideskies/ResponderService.java | 73 ++++++++++++ .../wideskies/mapreduce/MapReduceResponder.java | 45 +++++++ .../wideskies/spark/SparkResponder.java | 55 +++++++++ .../streaming/SparkStreamingResponder.java | 92 +++++++++++++++ .../wideskies/spi/ResponderPlugin.java | 40 +++++++ .../standalone/StandaloneResponder.java | 58 ++++++++++ .../wideskies/storm/StormResponder.java | 45 +++++++ ...pirk.responder.wideskies.spi.ResponderPlugin | 5 + src/main/resources/responder.properties | 10 +- 12 files changed, 442 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index 5ba8170..514491a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -111,7 +111,7 @@ public class ResponderCLI * Method to parse and validate the options provided * * @return - true if valid, false otherwise - * @throws IOException + * @throws IOException */ private boolean parseOptions() throws IOException { @@ -170,14 +170,14 @@ public class ResponderCLI optionLocalPropFile.setArgName(LOCALPROPFILE); optionLocalPropFile.setType(String.class); options.addOption(optionLocalPropFile); - + // hdfsPropertiesDir Option optionHDFSPropDir = new Option("hdfsPropsDir", HDFSPROPDIR, true, "Optional location of directory in hdfs containing properties file(s)"); optionHDFSPropDir.setRequired(false); optionHDFSPropDir.setArgName(HDFSPROPDIR); optionHDFSPropDir.setType(String.class); options.addOption(optionHDFSPropDir); - + // hdfsPropertiesFile Option optionHDFSPropFile = new Option("hdfsPropsFile", HDFSPROPFILE, true, "Optional location of properties file(s) in hdfs"); optionHDFSPropFile.setRequired(false); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index 044012d..02dbf2e 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -18,18 +18,8 @@ */ package org.apache.pirk.responder.wideskies; -import java.security.Permission; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.util.ToolRunner; -import org.apache.pirk.query.wideskies.Query; -import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; -import org.apache.pirk.responder.wideskies.spark.ComputeResponse; -import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse; -import org.apache.pirk.responder.wideskies.standalone.Responder; -import org.apache.pirk.responder.wideskies.storm.PirkTopology; -import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; import org.apache.pirk.utils.SystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,103 +40,31 @@ public class ResponderDriver { private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class); - private enum Platform - { - MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE - } - - public static void main(String[] args) throws Exception + public static void main(String[] args) { ResponderCLI responderCLI = new ResponderCLI(args); - // For handling System.exit calls from Spark Streaming - System.setSecurityManager(new SystemExitManager()); - - Platform platform = Platform.NONE; - String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM); + String platformName = SystemConfiguration.getProperty(ResponderProps.PLATFORM, "None"); + logger.info("Attempting to use platform {} ...", platformName); try { - platform = Platform.valueOf(platformString.toUpperCase()); - } catch (IllegalArgumentException e) - { - logger.error("platform " + platformString + " not found."); - } - - logger.info("platform = " + platform); - switch (platform) - { - case MAPREDUCE: - logger.info("Launching MapReduce ResponderTool:"); - - ComputeResponseTool pirWLTool = new ComputeResponseTool(); - ToolRunner.run(pirWLTool, new String[] {}); - break; - - case SPARK: - logger.info("Launching Spark ComputeResponse:"); - - ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration())); - computeResponse.performQuery(); - break; - - case SPARKSTREAMING: - logger.info("Launching Spark ComputeStreamingResponse:"); - - ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration())); - try - { - computeSR.performQuery(); - } catch (SystemExitException e) - { - // If System.exit(0) is not caught from Spark Streaming, - // the application will complete with a 'failed' status - logger.info("Exited with System.exit(0) from Spark Streaming"); - } - - // Teardown the context - computeSR.teardown(); - break; - - case STORM: - logger.info("Launching Storm PirkTopology:"); - PirkTopology.runPirkTopology(); - break; - - case STANDALONE: - logger.info("Launching Standalone Responder:"); - - String queryInput = SystemConfiguration.getProperty("pir.queryInput"); - Query query = new LocalFileSystemStore().recall(queryInput, Query.class); - - Responder pirResponder = new Responder(query); - pirResponder.computeStandaloneResponse(); - break; - } - } - - // Exception and Security Manager classes used to catch System.exit from Spark Streaming - private static class SystemExitException extends SecurityException - {} - - private static class SystemExitManager extends SecurityManager - { - @Override - public void checkPermission(Permission perm) - {} - - @Override - public void checkExit(int status) - { - super.checkExit(status); - if (status == 0) // If we exited cleanly, throw SystemExitException + ResponderPlugin responder = ResponderService.getInstance().getResponder(platformName); + if (responder == null) { - throw new SystemExitException(); + logger.error("No such platform plugin found: {}!", platformName); } else { - throw new SecurityException(); + responder.run(); } - + } + catch (PIRException pirEx) + { + logger.error("Failed to load platform plugin: {}! {}", platformName, pirEx.getMessage()); + } + catch (Exception ex) + { + logger.error("Failed to run platform plugin: {}! {}", platformName, ex); } } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java index 7cba88e..a9eb80d 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -128,15 +128,7 @@ public class ResponderProps if (!SystemConfiguration.hasProperty(PLATFORM)) { - logger.info("Must have the option " + PLATFORM); - valid = false; - } - - String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); - if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm") - && !platform.equals("standalone")) - { - logger.info("Unsupported platform: " + platform); + logger.info("Must have the option {}", PLATFORM); valid = false; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java new file mode 100644 index 0000000..129b1c9 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderService.java @@ -0,0 +1,73 @@ +package org.apache.pirk.responder.wideskies; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; + +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.utils.PIRException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResponderService +{ + private static final Logger logger = LoggerFactory.getLogger(ResponderService.class); + + // Singleton for the responder service. + private static ResponderService service; + private ServiceLoader<ResponderPlugin> loader; + + public static synchronized ResponderService getInstance() + { + if (service == null) + { + service = new ResponderService(); + } + return service; + } + + private ResponderService() + { + loader = ServiceLoader.load(ResponderPlugin.class); + } + + public ResponderPlugin getResponder(String platformName) throws PIRException + { + try + { + for(ResponderPlugin plugin : loader) + { + if (platformName.equalsIgnoreCase(plugin.getPlatformName())) + { + logger.debug("Found {}, in {}", platformName, plugin.getClass().getName()); + return plugin; + } + } + } + catch (ServiceConfigurationError e) + { + logger.error("ResponderPlugin configuration error {}", e); + throw new PIRException(e); + } + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java new file mode 100644 index 0000000..fc1d20b --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/MapReduceResponder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.mapreduce; + +import org.apache.hadoop.util.ToolRunner; +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to launch Map Reduce responder + */ +public class MapReduceResponder implements ResponderPlugin +{ + private static final Logger logger = LoggerFactory.getLogger(MapReduceResponder.class); + + @Override + public String getPlatformName() { + return "mapreduce"; + } + + @Override + public void run() throws Exception + { + logger.info("Launching MapReduce ResponderTool:"); + ComputeResponseTool pirWLTool = new ComputeResponseTool(); + ToolRunner.run(pirWLTool, new String[] {}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java new file mode 100644 index 0000000..bd05236 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/SparkResponder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.spark; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to launch spark responder + */ +public class SparkResponder implements ResponderPlugin +{ + private static final Logger logger = LoggerFactory.getLogger(SparkResponder.class); + + @Override + public String getPlatformName() { + return "spark"; + } + + @Override + public void run() throws Exception + { + logger.info("Launching Spark ComputeResponse:"); + try + { + ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration())); + computeResponse.performQuery(); + } + catch (IOException e) + { + logger.error("Unable to open filesystem: {}", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java new file mode 100644 index 0000000..295a3cf --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/SparkStreamingResponder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.spark.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.responder.wideskies.ResponderDriver; +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.Permission; + +/** + * Class to launch stand alone responder + */ +public class SparkStreamingResponder implements ResponderPlugin +{ + private static final Logger logger = LoggerFactory.getLogger(SparkStreamingResponder.class); + + @Override + public String getPlatformName() { + return "sparkstreaming"; + } + + @Override + public void run() throws Exception + { + // For handling System.exit calls from Spark Streaming + System.setSecurityManager(new SystemExitManager()); + logger.info("Launching Spark ComputeStreamingResponse:"); + ComputeStreamingResponse computeSR = null; + try + { + computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration())); + computeSR.performQuery(); + } + catch (SystemExitException e) + { + // If System.exit(0) is not caught from Spark Streaming, + // the application will complete with a 'failed' status + logger.info("Exited with System.exit(0) from Spark Streaming"); + } + finally + { + // Teardown the context + if (computeSR != null) + computeSR.teardown(); + } + } + + // Exception and Security Manager classes used to catch System.exit from Spark Streaming + private static class SystemExitException extends SecurityException + {} + + private static class SystemExitManager extends SecurityManager + { + @Override + public void checkPermission(Permission perm) + {} + + @Override + public void checkExit(int status) + { + super.checkExit(status); + if (status == 0) // If we exited cleanly, throw SystemExitException + { + throw new SystemExitException(); + } + else + { + throw new SecurityException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java new file mode 100644 index 0000000..3dade0d --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/spi/ResponderPlugin.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.spi; + +/** + * Interface which launches a responder + * <p> + * Implement this interface to start the execution of a framework responder, the run method will be called via reflection by the ResponderDriver. + * </p> + */ +public interface ResponderPlugin +{ + /** + * Returns the plugin name for your framework + * This will be the platform argument + * @return + */ + public String getPlatformName(); + /** + * This method launches your framework responder. + */ + public void run() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java new file mode 100644 index 0000000..5214c5f --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/standalone/StandaloneResponder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.standalone; + +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.apache.pirk.serialization.LocalFileSystemStore; +import org.apache.pirk.utils.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class to launch stand alone responder + */ +public class StandaloneResponder implements ResponderPlugin +{ + private static final Logger logger = LoggerFactory.getLogger(StandaloneResponder.class); + + @Override + public String getPlatformName() { + return "standalone"; + } + + @Override + public void run() + { + logger.info("Launching Standalone Responder:"); + String queryInput = SystemConfiguration.getProperty("pir.queryInput"); + try + { + Query query = new LocalFileSystemStore().recall(queryInput, Query.class); + Responder pirResponder = new Responder(query); + pirResponder.computeStandaloneResponse(); + } + catch (IOException e) + { + logger.error("Error reading {}, {}", queryInput, e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java new file mode 100644 index 0000000..08400ac --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormResponder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pirk.responder.wideskies.storm; + +import org.apache.pirk.responder.wideskies.spi.ResponderPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to launch Storm responder + */ +public class StormResponder implements ResponderPlugin +{ + + private static final Logger logger = LoggerFactory.getLogger(StormResponder.class); + @Override + public String getPlatformName() + { + return "storm"; + } + + @Override + public void run() throws Exception + { + logger.info("Launching Storm PirkTopology:"); + PirkTopology.runPirkTopology(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin ---------------------------------------------------------------------- diff --git a/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin new file mode 100644 index 0000000..33aff36 --- /dev/null +++ b/src/main/resources/META-INF/services/org.apache.pirk.responder.wideskies.spi.ResponderPlugin @@ -0,0 +1,5 @@ +org.apache.pirk.responder.wideskies.mapreduce.MapReduceResponder +org.apache.pirk.responder.wideskies.spark.SparkResponder +org.apache.pirk.responder.wideskies.spark.streaming.SparkStreamingResponder +org.apache.pirk.responder.wideskies.standalone.StandaloneResponder +org.apache.pirk.responder.wideskies.storm.StormResponder \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/43c772c4/src/main/resources/responder.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties index ac6cb35..bb38b1d 100644 --- a/src/main/resources/responder.properties +++ b/src/main/resources/responder.properties @@ -27,9 +27,15 @@ pir.dataInputFormat= #outputFile -- required -- Fully qualified name of output file in hdfs pir.outputFile= -#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone' +#One of the following two options is required - launcher prefered + +#launcher -- required -- full class name of a class implementing ResponderPlugin +#ie. org.apache.pirk.responder.wideskies.standalone.StandaloneResponderPluginProcessing platform technology for the responder +#launcher= + +#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', 'standalone', or 'storm' #Processing platform technology for the responder -platform= +platform= #queryInput -- required -- Fully qualified dir in hdfs of Query files pir.queryInput=
