RYA-453 Implement the Query Manager's Daemon that controls the application.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/16202ac7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/16202ac7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/16202ac7 Branch: refs/heads/master Commit: 16202ac7d81a37d56f9a3a54f22f79bbea3fe8d5 Parents: e355f73 Author: kchilton2 <[email protected]> Authored: Tue Jan 30 14:44:58 2018 -0500 Committer: Valiyil <[email protected]> Committed: Fri Mar 9 12:59:48 2018 -0500 ---------------------------------------------------------------------- extras/rya.streams/query-manager/pom.xml | 5 + .../querymanager/QueryManagerDaemon.java | 99 ++++++++++++++++++-- .../xml/QueryManagerConfigUnmarshaller.java | 10 +- .../xml/QueryManagerConfigMarshallerTest.java | 13 ++- 4 files changed, 110 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml index 2141a3a..deaccdb 100644 --- a/extras/rya.streams/query-manager/pom.xml +++ b/extras/rya.streams/query-manager/pom.xml @@ -47,6 +47,11 @@ under the License. <version>1.1.0</version> </dependency> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + <!-- Test dependencies --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java index 2ab0ad8..515d699 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,32 +18,117 @@ */ package org.apache.rya.streams.querymanager; +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + import org.apache.commons.daemon.Daemon; import org.apache.commons.daemon.DaemonContext; import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ @DefaultAnnotation(NonNull.class) public class QueryManagerDaemon implements Daemon { + + private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + + /** + * The default configuration file's path for the application. + */ + private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + + /** + * Command line parameters that are used by all commands that interact with Kafka. + */ + class DaemonParameters { + @Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") + public String config; + } + + private QueryManager manager = null; + @Override public void init(final DaemonContext context) throws DaemonInitException, Exception { - System.out.println("Initializing Query Manager Daemon."); + requireNonNull(context); + + // Parse the command line arguments for the configuration file to use. + final String[] args = context.getArguments(); + final DaemonParameters params = new DaemonParameters(); + try { + new JCommander(params).parse(args); + } catch(final ParameterException e) { + throw new DaemonInitException("Unable to parse the command line arguments.", e); + } + final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; + log.info("Loading the following configuration file: " + configFile); + + // Unmarshall the configuration file into an object. + final QueryManagerConfig config; + try(InputStream stream = Files.newInputStream(configFile)) { + config = QueryManagerConfigUnmarshaller.unmarshall(stream); + } catch(final JAXBException | SAXException e) { + throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e); + } + + // Read the source polling period from the configuration. + final QueryChanngeLogDiscoveryPeriod periodConfig = config.getPerformanceTunning().getQueryChanngeLogDiscoveryPeriod(); + final long period = periodConfig.getValue().longValue(); + final TimeUnit units = TimeUnit.valueOf( periodConfig.getUnits().toString() ); + log.info("Query Change Log Polling Period: " + period + " " + units); + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, period, units); + + // Initialize a QueryChangeLogSource. + final Kafka kafka = config.getQueryChangeLogSource().getKafka(); + log.info("Kafka Source: " + kafka.getHostname() + ":" + kafka.getPort()); + final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler); + + // Initialize a QueryExecutor. + final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort()); + final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory); + + // Initialize the QueryManager using the configured resources. + manager = new QueryManager(queryExecutor, source, scheduler); } @Override public void start() throws Exception { - System.out.println("Starting Query Manager Daemon."); + log.info("Starting the Rya Streams Query Manager Daemon."); + manager.startAndWait(); } @Override public void stop() throws Exception { - System.out.println("Stopping Query Manager Daemon."); + log.info("Stopping the Rya Streams Query Manager Daemon."); + manager.stopAndWait(); } @Override - public void destroy() { - System.out.println("Query Manager Daemon Destroyed."); - } + public void destroy() { } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java index 834f0b9..39de24d 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigUnmarshaller.java @@ -20,7 +20,7 @@ package org.apache.rya.streams.querymanager.xml; import static java.util.Objects.requireNonNull; -import java.io.Reader; +import java.io.InputStream; import java.net.URL; import javax.xml.XMLConstants; @@ -49,13 +49,13 @@ public class QueryManagerConfigUnmarshaller { /** * Validates and unmarshalls XML into a {@link QueryManagerConfig} object. * - * @param xmlReader - Reads the XML that will be unmarshalled. (not null) + * @param xmlStream - Reads the XML that will be unmarshalled. (not null) * @return A {@link QueryManagerConfig} loaded with the XMLs values. * @throws SAXException Could not load the schema the XML will be validated against. * @throws JAXBException Could not unmarshal the XML into a POJO. */ - public static QueryManagerConfig unmarshall(final Reader xmlReader) throws JAXBException, SAXException { - requireNonNull(xmlReader); + public static QueryManagerConfig unmarshall(final InputStream xmlStream) throws JAXBException, SAXException { + requireNonNull(xmlStream); // Get an input stream to the XSD file that is packaged inside of the jar. final URL schemaURL = ClassLoader.getSystemResource(XSD_PATH); @@ -73,6 +73,6 @@ public class QueryManagerConfigUnmarshaller { unmarshaller.setSchema(schema); // Perform the unmarshal. - return (QueryManagerConfig) unmarshaller.unmarshal(xmlReader); + return (QueryManagerConfig) unmarshaller.unmarshal(xmlStream); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/16202ac7/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java index 831c06b..f2b50ab 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java @@ -20,12 +20,15 @@ package org.apache.rya.streams.querymanager.xml; import static org.junit.Assert.assertNotNull; -import java.io.StringReader; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import javax.xml.bind.UnmarshalException; import org.junit.Test; +import com.google.common.base.Charsets; + /** * Unit tests the methods of {@link QueryManagerConfigUnmarshaller}. */ @@ -50,8 +53,8 @@ public class QueryManagerConfigMarshallerTest { " </performanceTunning>\n" + "</queryManagerConfig>"; - - final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml)); + final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)); + final QueryManagerConfig config = QueryManagerConfigUnmarshaller.unmarshall(xmlStream); assertNotNull(config); } @@ -68,7 +71,7 @@ public class QueryManagerConfigMarshallerTest { " </queryChangeLogSource>\n" + "</queryManagerConfig>"; - - QueryManagerConfigUnmarshaller.unmarshall(new StringReader(xml)); + final InputStream xmlStream = new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)); + QueryManagerConfigUnmarshaller.unmarshall(xmlStream); } } \ No newline at end of file
