RYA-356 Added a Twill App for running the periodic service. Closes #248.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8acd24b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8acd24b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8acd24b5 Branch: refs/heads/master Commit: 8acd24b5ec477f7943453e74d753dab03be99352 Parents: b372ebc Author: jdasch <[email protected]> Authored: Thu Sep 7 16:57:13 2017 -0400 Committer: Caleb Meier <[email protected]> Committed: Tue Nov 7 07:49:12 2017 -0800 ---------------------------------------------------------------------- common/rya.api/pom.xml | 10 +- .../apache/rya/api/client/LoadStatements.java | 41 ++ .../org/apache/rya/api/client/RyaClient.java | 18 +- .../org/apache/rya/api/resolver/RyaContext.java | 65 +-- extras/indexing/pom.xml | 5 + .../client/accumulo/AccumuloLoadStatements.java | 123 +++++ .../accumulo/AccumuloRyaClientFactory.java | 1 + .../pcj/matching/AccumuloIndexSetProvider.java | 15 +- extras/periodic.notification/api/pom.xml | 48 +- .../api/PeriodicNotificationClient.java | 17 +- .../KafkaNotificationRegistrationClient.java | 34 +- .../serialization/BindingSetSerDe.java | 9 +- .../CommandNotificationSerializer.java | 30 +- extras/periodic.notification/pom.xml | 55 +-- extras/periodic.notification/service/pom.xml | 72 ++- .../PeriodicNotificationApplication.java | 76 +++- ...dicNotificationApplicationConfiguration.java | 142 +++--- .../PeriodicNotificationApplicationFactory.java | 81 ++-- .../exporter/KafkaExporterExecutor.java | 41 +- .../KafkaPeriodicBindingSetExporter.java | 57 ++- .../NotificationProcessorExecutor.java | 49 +- .../TimestampedNotificationProcessor.java | 82 ++-- .../notification/pruner/AccumuloBinPruner.java | 28 +- .../notification/pruner/FluoBinPruner.java | 26 +- .../pruner/PeriodicQueryPruner.java | 64 +-- .../pruner/PeriodicQueryPrunerExecutor.java | 31 +- .../kafka/KafkaNotificationProvider.java | 29 +- .../kafka/PeriodicNotificationConsumer.java | 44 +- extras/periodic.notification/tests/pom.xml | 30 +- .../PeriodicNotificationApplicationIT.java | 2 +- .../src/test/resources/notification.properties | 25 +- .../periodic.notification/twill.yarn/README.md | 18 + extras/periodic.notification/twill.yarn/pom.xml | 98 ++++ .../src/main/assembly/binary-release.xml | 30 ++ .../src/main/assembly/component-release.xml | 104 +++++ .../src/main/config/hadoop/core-site.xml | 25 ++ .../src/main/config/hadoop/yarn-site.xml | 25 ++ .../twill.yarn/src/main/config/logback.xml | 57 +++ .../src/main/config/notification.properties | 67 +++ .../twill.yarn/src/main/config/twill-env.sh | 63 +++ .../yarn/PeriodicNotificationTwillRunner.java | 315 +++++++++++++ .../scripts/periodicNotificationTwillApp.sh | 32 ++ extras/periodic.notification/twill/README.md | 36 ++ extras/periodic.notification/twill/pom.xml | 177 ++++++++ .../twill/PeriodicNotificationTwillApp.java | 57 +++ .../PeriodicNotificationTwillRunnable.java | 119 +++++ extras/rya.benchmark/README.md | 77 ++++ extras/rya.benchmark/pom.xml | 25 +- .../src/main/assembly/binary-release.xml | 33 ++ .../src/main/assembly/component-release.xml | 81 ++++ .../src/main/config/common.options | 44 ++ .../src/main/config/log4j.properties | 41 ++ .../src/main/config/periodic.options | 49 ++ .../src/main/config/projection.options | 36 ++ .../benchmark/periodic/BenchmarkOptions.java | 78 ++++ .../periodic/BenchmarkStatementGenerator.java | 90 ++++ .../rya/benchmark/periodic/CommonOptions.java | 117 +++++ .../periodic/KafkaLatencyBenchmark.java | 445 +++++++++++++++++++ .../periodic/PeriodicQueryCommand.java | 70 +++ .../periodic/ProjectionQueryCommand.java | 31 ++ .../scripts/periodicNotificationBenchmark.sh | 32 ++ .../scripts/projectionNotificationBenchmark.sh | 32 ++ extras/rya.export/export.client/conf/config.xml | 18 +- .../AccumuloPeriodicQueryResultStorage.java | 95 ++-- .../rya.manual/src/site/markdown/pcj-updater.md | 18 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 5 +- .../pcj/fluo/app/FilterResultUpdater.java | 14 +- .../app/batch/AbstractSpanBatchInformation.java | 4 +- .../fluo/app/batch/JoinBatchInformation.java | 9 +- .../export/kafka/KafkaBindingSetExporter.java | 14 +- .../kafka/KafkaBindingSetExporterFactory.java | 14 +- .../export/kafka/KafkaExportParameterBase.java | 5 +- .../export/kafka/KafkaRyaSubGraphExporter.java | 23 +- .../kafka/KafkaRyaSubGraphExporterFactory.java | 13 +- .../KryoVisibilityBindingSetSerializer.java | 46 +- .../rya/RyaBindingSetExporterFactory.java | 5 +- .../fluo/app/observers/QueryResultObserver.java | 37 +- .../pcj/fluo/app/observers/TripleObserver.java | 15 +- .../indexing/pcj/fluo/app/query/FluoQuery.java | 84 ++-- .../fluo/app/query/FluoQueryMetadataDAO.java | 68 +-- .../pcj/fluo/integration/KafkaExportIT.java | 25 +- .../org/apache/rya/shell/RyaAdminCommands.java | 3 +- pom.xml | 74 ++- 83 files changed, 3619 insertions(+), 824 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/pom.xml ---------------------------------------------------------------------- diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml index 65ef381..a683507 100644 --- a/common/rya.api/pom.xml +++ b/common/rya.api/pom.xml @@ -32,6 +32,10 @@ under the License. <dependencies> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> <groupId>org.calrissian.mango</groupId> <artifactId>mango-core</artifactId> </dependency> @@ -60,7 +64,6 @@ under the License. <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>2.8.0</version> </dependency> <dependency> <groupId>com.github.stephenc.findbugs</groupId> @@ -71,10 +74,9 @@ under the License. <artifactId>jcip-annotations</artifactId> </dependency> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> + <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> - <version>2.24.0</version> - </dependency> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java new file mode 100644 index 0000000..2fdb77b --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatements.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import org.openrdf.model.Statement; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Loads a set of statements into an instance of Rya. + */ +@DefaultAnnotation(NonNull.class) +public interface LoadStatements { + + /** + * Loads a set of RDF statements into an instance of Rya. + * + * @param ryaInstanceName - The name of the Rya instance the statements will be loaded into. (not null) + * @param statements - An iterable of Statement objects that should be added to Rya. (not null) + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws RyaClientException Something caused the command to fail. + */ + public void loadStatements(String ryaInstanceName, Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java index c04bd86..1278193 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java @@ -44,6 +44,7 @@ public class RyaClient { private final AddUser addUser; private final RemoveUser removeUser; private final Uninstall uninstall; + private final LoadStatements loadStatements; private final LoadStatementsFile loadStatementsFile; private final ExecuteSparqlQuery executeSparqlQuery; @@ -64,6 +65,7 @@ public class RyaClient { final AddUser addUser, final RemoveUser removeUser, final Uninstall uninstall, + final LoadStatements loadStatements, final LoadStatementsFile loadStatementsFile, final ExecuteSparqlQuery executeSparqlQuery) { this.install = requireNonNull(install); @@ -79,6 +81,7 @@ public class RyaClient { this.addUser = requireNonNull(addUser); this.removeUser = requireNonNull(removeUser); this.uninstall = requireNonNull(uninstall); + this.loadStatements = requireNonNull(loadStatements); this.loadStatementsFile = requireNonNull(loadStatementsFile); this.executeSparqlQuery = requireNonNull(executeSparqlQuery); } @@ -105,10 +108,10 @@ public class RyaClient { public DeletePCJ getDeletePCJ() { return deletePcj; } - + /** * @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage - */ + */ public CreatePeriodicPCJ getCreatePeriodicPCJ() { return createPeriodicPcj; } @@ -119,7 +122,7 @@ public class RyaClient { public DeletePeriodicPCJ getDeletePeriodicPCJ() { return deletePeriodicPcj; } - + /** * @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly * maintained by the Rya instance @@ -127,7 +130,7 @@ public class RyaClient { public ListIncrementalQueries getListIncrementalQueries() { return listIncrementalQueries; } - + /** * @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage * if the Rya instance supports PCJ indexing. @@ -179,6 +182,13 @@ public class RyaClient { } /** + * @return An instance of {@link LoadStatements} that is connected to a Rya storage. + */ + public LoadStatements getLoadStatements() { + return loadStatements; + } + + /** * @return An instance of {@link LoadStatementsFile} that is connected to a Rya storage. */ public LoadStatementsFile getLoadStatementsFile() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java index d19226e..49fc5d1 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaContext.java @@ -8,9 +8,9 @@ package org.apache.rya.api.resolver; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,11 +39,10 @@ import org.apache.rya.api.resolver.impl.RyaTypeResolverImpl; import org.apache.rya.api.resolver.impl.RyaURIResolver; import org.apache.rya.api.resolver.impl.ServiceBackedRyaTypeResolverMappings; import org.apache.rya.api.resolver.impl.ShortRyaTypeResolver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.openrdf.model.URI; import org.openrdf.model.vocabulary.XMLSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Date: 7/16/12 @@ -51,10 +50,10 @@ import org.openrdf.model.vocabulary.XMLSchema; */ public class RyaContext { - public Log logger = LogFactory.getLog(RyaContext.class); + public Logger logger = LoggerFactory.getLogger(RyaContext.class); - private Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>(); - private Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>(); + private final Map<URI, RyaTypeResolver> uriToResolver = new HashMap<URI, RyaTypeResolver>(); + private final Map<Byte, RyaTypeResolver> byteToResolver = new HashMap<Byte, RyaTypeResolver>(); private RyaTypeResolver defaultResolver = new CustomDatatypeResolver(); private RyaContext() { @@ -91,47 +90,51 @@ public class RyaContext { public synchronized static RyaContext getInstance() { return RyaContextHolder.INSTANCE; } - + //need to go from datatype->resolver - public RyaTypeResolver retrieveResolver(URI datatype) { - RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype); - if (ryaTypeResolver == null) return defaultResolver; + public RyaTypeResolver retrieveResolver(final URI datatype) { + final RyaTypeResolver ryaTypeResolver = uriToResolver.get(datatype); + if (ryaTypeResolver == null) { + return defaultResolver; + } return ryaTypeResolver; } //need to go from byte->resolver - public RyaTypeResolver retrieveResolver(byte markerByte) { - RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte); - if (ryaTypeResolver == null) return defaultResolver; + public RyaTypeResolver retrieveResolver(final byte markerByte) { + final RyaTypeResolver ryaTypeResolver = byteToResolver.get(markerByte); + if (ryaTypeResolver == null) { + return defaultResolver; + } return ryaTypeResolver; } - public byte[] serialize(RyaType ryaType) throws RyaTypeResolverException { - RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType()); + public byte[] serialize(final RyaType ryaType) throws RyaTypeResolverException { + final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType()); if (ryaTypeResolver != null) { return ryaTypeResolver.serialize(ryaType); } return null; } - public byte[][] serializeType(RyaType ryaType) throws RyaTypeResolverException { - RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType()); + public byte[][] serializeType(final RyaType ryaType) throws RyaTypeResolverException { + final RyaTypeResolver ryaTypeResolver = retrieveResolver(ryaType.getDataType()); if (ryaTypeResolver != null) { return ryaTypeResolver.serializeType(ryaType); } return null; } - public RyaType deserialize(byte[] bytes) throws RyaTypeResolverException { - RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]); + public RyaType deserialize(final byte[] bytes) throws RyaTypeResolverException { + final RyaTypeResolver ryaTypeResolver = retrieveResolver(bytes[bytes.length - 1]); if (ryaTypeResolver != null) { return ryaTypeResolver.deserialize(bytes); } return null; } - public void addRyaTypeResolverMapping(RyaTypeResolverMapping mapping) { + public void addRyaTypeResolverMapping(final RyaTypeResolverMapping mapping) { if (!uriToResolver.containsKey(mapping.getRyaDataType())) { if (logger.isDebugEnabled()) { logger.debug("addRyaTypeResolverMapping uri:[" + mapping.getRyaDataType() + "] byte:[" + mapping.getMarkerByte() + "] for mapping[" + mapping + "]"); @@ -143,14 +146,14 @@ public class RyaContext { } } - public void addRyaTypeResolverMappings(List<RyaTypeResolverMapping> mappings) { - for (RyaTypeResolverMapping mapping : mappings) { + public void addRyaTypeResolverMappings(final List<RyaTypeResolverMapping> mappings) { + for (final RyaTypeResolverMapping mapping : mappings) { addRyaTypeResolverMapping(mapping); } } - public RyaTypeResolver removeRyaTypeResolver(URI dataType) { - RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType); + public RyaTypeResolver removeRyaTypeResolver(final URI dataType) { + final RyaTypeResolver ryaTypeResolver = uriToResolver.remove(dataType); if (ryaTypeResolver != null) { if (logger.isDebugEnabled()) { logger.debug("Removing ryaType Resolver uri[" + dataType + "] + [" + ryaTypeResolver + "]"); @@ -161,8 +164,8 @@ public class RyaContext { return null; } - public RyaTypeResolver removeRyaTypeResolver(byte markerByte) { - RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte); + public RyaTypeResolver removeRyaTypeResolver(final byte markerByte) { + final RyaTypeResolver ryaTypeResolver = byteToResolver.remove(markerByte); if (ryaTypeResolver != null) { if (logger.isDebugEnabled()) { logger.debug("Removing ryaType Resolver byte[" + markerByte + "] + [" + ryaTypeResolver + "]"); @@ -174,8 +177,8 @@ public class RyaContext { } //transform range - public RyaRange transformRange(RyaRange range) throws RyaTypeResolverException { - RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType()); + public RyaRange transformRange(final RyaRange range) throws RyaTypeResolverException { + final RyaTypeResolver ryaTypeResolver = retrieveResolver(range.getStart().getDataType()); if (ryaTypeResolver != null) { return ryaTypeResolver.transformRange(range); } @@ -186,7 +189,7 @@ public class RyaContext { return defaultResolver; } - public void setDefaultResolver(RyaTypeResolver defaultResolver) { + public void setDefaultResolver(final RyaTypeResolver defaultResolver) { this.defaultResolver = defaultResolver; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index 6fe35e9..7f3901e 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -149,6 +149,11 @@ <artifactId>maven-shade-plugin</artifactId> <executions> <execution> + <id>map-reduce</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>map-reduce</shadedClassifierName> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java new file mode 100644 index 0000000..9556bcf --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatements.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.LoadStatements; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.model.Statement; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An Accumulo implementation of the {@link LoadStatements} command. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloLoadStatements extends AccumuloCommand implements LoadStatements { + private static final Logger log = Logger.getLogger(AccumuloLoadStatements.class); + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloLoadStatements}. + * + * @param connectionDetails - Details about the values that were used to create + * the connector to the cluster. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo + * that hosts Rya instance. (not null) + */ + public AccumuloLoadStatements(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public void loadStatements(final String ryaInstanceName, final Iterable<? extends Statement> statements) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(ryaInstanceName); + requireNonNull(statements); + + // Ensure the Rya Instance exists. + if(!instanceExists.exists(ryaInstanceName)) { + throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", ryaInstanceName)); + } + + Sail sail = null; + SailRepository sailRepo = null; + SailRepositoryConnection sailRepoConn = null; + + try { + // Get a Sail object that is connected to the Rya instance. + final AccumuloRdfConfiguration ryaConf = getAccumuloConnectionDetails().buildAccumuloRdfConfiguration(ryaInstanceName); + ryaConf.setFlush(false); //RYA-327 should address this hardcoded value. + sail = RyaSailFactory.getInstance(ryaConf); + + // Load the file. + sailRepo = new SailRepository(sail); + sailRepoConn = sailRepo.getConnection(); + sailRepoConn.add(statements); + + } catch (final SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + log.warn("Exception while loading:", e); + throw new RyaClientException("A problem connecting to the Rya instance named '" + ryaInstanceName + "' has caused the load to fail.", e); + } catch (final Exception e) { + log.warn("Exception while loading:", e); + throw new RyaClientException("A problem processing the RDF statements has caused the load into Rya instance named " + ryaInstanceName + "to fail.", e); + } finally { + // Shut it all down. + if(sailRepoConn != null) { + try { + sailRepoConn.close(); + } catch (final RepositoryException e) { + log.warn("Couldn't close the SailRepoConnection that is attached to the Rya instance.", e); + } + } + if(sailRepo != null) { + try { + sailRepo.shutDown(); + } catch (final RepositoryException e) { + log.warn("Couldn't shut down the SailRepository that is attached to the Rya instance.", e); + } + } + if(sail != null) { + try { + sail.shutDown(); + } catch (final SailException e) { + log.warn("Couldn't shut down the Sail that is attached to the Rya instance.", e); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java index d9bf644..fcc712c 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -62,6 +62,7 @@ public class AccumuloRyaClientFactory { new AccumuloAddUser(connectionDetails, connector), new AccumuloRemoveUser(connectionDetails, connector), new AccumuloUninstall(connectionDetails, connector), + new AccumuloLoadStatements(connectionDetails, connector), new AccumuloLoadStatementsFile(connectionDetails, connector), new AccumuloExecuteSparqlQuery(connectionDetails, connector)); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java index 40e2c77..4a15665 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java @@ -18,12 +18,11 @@ */ package org.apache.rya.indexing.pcj.matching; -import static java.util.Objects.requireNonNull; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -73,13 +72,11 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup private boolean init = false; public AccumuloIndexSetProvider(final Configuration conf) { - Preconditions.checkNotNull(conf); - this.conf = conf; + this.conf = Objects.requireNonNull(conf); } public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) { - Preconditions.checkNotNull(conf); - this.conf = conf; + this(conf); indexCache = indices; init = true; } @@ -155,9 +152,9 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup */ private List<ExternalTupleSet> getAccIndices() throws Exception { - requireNonNull(conf); - final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); - final Connector conn = requireNonNull(ConfigUtils.getConnector(conf)); + Objects.requireNonNull(conf); + final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); + final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf)); List<String> tables = null; if (conf instanceof RdfCloudTripleStoreConfiguration) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/pom.xml b/extras/periodic.notification/api/pom.xml index 9f62e73..01d5c60 100644 --- a/extras/periodic.notification/api/pom.xml +++ b/extras/periodic.notification/api/pom.xml @@ -1,15 +1,23 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.rya</groupId> @@ -23,16 +31,13 @@ <description>API for Periodic Notification Applications</description> <dependencies> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.8.0</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.openrdf.sesame</groupId> @@ -46,6 +51,13 @@ <groupId>org.apache.rya</groupId> <artifactId>rya.indexing.pcj</artifactId> </dependency> + + <!-- testing dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java index ff08733..5a473d2 100644 --- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java +++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java @@ -36,20 +36,20 @@ public interface PeriodicNotificationClient extends AutoCloseable { * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor} * @param notification - notification to be added */ - public void addNotification(PeriodicNotification notification); - + void addNotification(PeriodicNotification notification); + /** * Deletes a notification from the {@link NotificationCoordinatorExecutor}. * @param notification - notification to be deleted */ - public void deleteNotification(BasicNotification notification); - + void deleteNotification(BasicNotification notification); + /** * Deletes a notification from the {@link NotificationCoordinatorExecutor}. * @param notification - id corresponding to the notification to be deleted */ - public void deleteNotification(String notificationId); - + void deleteNotification(String notificationId); + /** * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor} * @param id - Periodic Query id @@ -57,8 +57,5 @@ public interface PeriodicNotificationClient extends AutoCloseable { * @param delay - initial delay for starting periodic notifications * @param unit - time unit of delay and period */ - public void addNotification(String id, long period, long delay, TimeUnit unit); - - public void close(); - + void addNotification(String id, long period, long delay, TimeUnit unit); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java index bb438be..b022d3e 100644 --- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java +++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java @@ -31,50 +31,50 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification; /** * Implementation of {@link PeriodicNotificaitonClient} used to register new notification - * requests with the PeriodicQueryService. + * requests with the PeriodicQueryService. * */ public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient { - private KafkaProducer<String, CommandNotification> producer; - private String topic; - - public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) { + private final KafkaProducer<String, CommandNotification> producer; + private final String topic; + + public KafkaNotificationRegistrationClient(final String topic, final KafkaProducer<String, CommandNotification> producer) { this.topic = topic; this.producer = producer; } - + @Override - public void addNotification(PeriodicNotification notification) { + public void addNotification(final PeriodicNotification notification) { processNotification(new CommandNotification(Command.ADD, notification)); } @Override - public void deleteNotification(BasicNotification notification) { + public void deleteNotification(final BasicNotification notification) { processNotification(new CommandNotification(Command.DELETE, notification)); } @Override - public void deleteNotification(String notificationId) { + public void deleteNotification(final String notificationId) { processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId))); } @Override - public void addNotification(String id, long period, long delay, TimeUnit unit) { - Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); + public void addNotification(final String id, final long period, final long delay, final TimeUnit unit) { + final Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); processNotification(new CommandNotification(Command.ADD, notification)); } - - - private void processNotification(CommandNotification notification) { + + + private void processNotification(final CommandNotification notification) { producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification)); } - + @Override public void close() { + // TODO scoping issue. If we're closing this producer, we should also create it - otherwise other classes may be using it + // or we shouldn't implement autocloseable. producer.close(); } - - } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java index 129bd6d..6db7b18 100644 --- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java +++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java @@ -25,12 +25,13 @@ import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.primitives.Bytes; @@ -42,7 +43,7 @@ import com.google.common.primitives.Bytes; */ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> { - private static final Logger log = Logger.getLogger(BindingSetSerDe.class); + private static final Logger log = LoggerFactory.getLogger(BindingSetSerDe.class); private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); private static final byte[] DELIM_BYTE = "\u0002".getBytes(StandardCharsets.UTF_8); @@ -60,7 +61,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin final int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); final byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); final byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); - final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); + final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes, StandardCharsets.UTF_8).split(";")); return getBindingSet(varOrder, bsBytesNoVarOrder); } catch(final Exception e) { log.trace("Unable to deserialize BindingSet: " + bsBytes); @@ -75,7 +76,7 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin private byte[] getBytes(final VariableOrder varOrder, final BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { final byte[] bsBytes = serializer.convert(bs, varOrder); final String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); - final byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); + final byte[] varOrderBytes = varOrderString.getBytes(StandardCharsets.UTF_8); return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java index 302e1be..13c789f 100644 --- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java +++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java @@ -18,7 +18,7 @@ */ package org.apache.rya.periodic.notification.serialization; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonParseException; /** * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s @@ -43,22 +44,23 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class); @Override - public CommandNotification deserialize(String topic, byte[] bytes) { - String json = null; + public CommandNotification deserialize(final String topic, final byte[] bytes) { try { - json = new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to deserialize notification for topic: " + topic); + final String json = new String(bytes, StandardCharsets.UTF_8); + return gson.fromJson(json, CommandNotification.class); + } catch (final JsonParseException e) { + LOG.warn("Unable to deserialize notification for topic: " + topic); + throw new RuntimeException(e); } - return gson.fromJson(json, CommandNotification.class); + } @Override - public byte[] serialize(String topic, CommandNotification command) { + public byte[] serialize(final String topic, final CommandNotification command) { try { - return gson.toJson(command).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to serialize notification: " + command + "for topic: " + topic); + return gson.toJson(command).getBytes(StandardCharsets.UTF_8); + } catch (final JsonParseException e) { + LOG.warn("Unable to serialize notification: " + command + "for topic: " + topic); throw new RuntimeException(e); } } @@ -67,10 +69,10 @@ public class CommandNotificationSerializer implements Serializer<CommandNotifica public void close() { // Do nothing. Nothing to close } - + @Override - public void configure(Map<String, ?> arg0, boolean arg1) { + public void configure(final Map<String, ?> arg0, final boolean arg1) { // Do nothing. Nothing to configure } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/pom.xml b/extras/periodic.notification/pom.xml index c49db73..7610b88 100644 --- a/extras/periodic.notification/pom.xml +++ b/extras/periodic.notification/pom.xml @@ -1,40 +1,43 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. --> - <modelVersion>4.0.0</modelVersion> - <artifactId>rya.periodic.notification.parent</artifactId> - - <parent> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> <groupId>org.apache.rya</groupId> <artifactId>rya.extras</artifactId> <version>3.2.12-incubating-SNAPSHOT</version> </parent> - - <name>Apache Rya Periodic Notification Parent</name> - <description>Parent POM for Rya Periodic Notification Projects</description> - - <packaging>pom</packaging> + + <artifactId>rya.periodic.notification.parent</artifactId> + + <name>Apache Rya Periodic Notification Parent</name> + <description>Parent POM for Rya Periodic Notification Projects</description> + + <packaging>pom</packaging> <modules> <module>api</module> <module>service</module> + <module>twill</module> + <module>twill.yarn</module> <module>tests</module> </modules> - </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/pom.xml b/extras/periodic.notification/service/pom.xml index 2e61733..18aef13 100644 --- a/extras/periodic.notification/service/pom.xml +++ b/extras/periodic.notification/service/pom.xml @@ -1,15 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> <parent> <groupId>org.apache.rya</groupId> <artifactId>rya.periodic.notification.parent</artifactId> @@ -22,42 +31,20 @@ <description>Notifications for Rya Periodic Service</description> <dependencies> - + <!-- compile dependencies --> <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-api</artifactId> - <version>0.11.0</version> - </dependency> - <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-yarn</artifactId> - <version>0.11.0</version> - <exclusions> - <exclusion> - <artifactId>kafka_2.10</artifactId> - <groupId>org.apache.kafka</groupId> - </exclusion> - </exclusions> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>2.8.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.fluo</groupId> <artifactId>fluo-api</artifactId> </dependency> <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-core</artifactId> - </dependency> - <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.indexing</artifactId> </dependency> @@ -71,13 +58,22 @@ </dependency> <dependency> <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.app</artifactId> + <artifactId>rya.periodic.notification.api</artifactId> </dependency> + + <!-- runtime dependencies --> <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.notification.api</artifactId> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-core</artifactId> + <scope>runtime</scope> </dependency> + <!-- testing dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java index 92a7d18..79abe2f 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java @@ -18,7 +18,10 @@ */ package org.apache.rya.periodic.notification.application; -import org.apache.log4j.Logger; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.BindingSetRecord; @@ -30,6 +33,8 @@ import org.apache.rya.periodic.notification.processor.NotificationProcessorExecu import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; import org.openrdf.query.algebra.evaluation.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -68,36 +73,40 @@ import com.google.common.base.Preconditions; * the period at which the user wants to receive updates, and the time unit. The * following query requests all observations that occurred within the last * minute and requests updates every 15 seconds. It also performs a count on - * those observations. <br> - * <br> - * <li>prefix function: http://org.apache.rya/function# - * <li>"prefix time: http://www.w3.org/2006/time# - * <li>"select (count(?obs) as ?total) where { - * <li>"Filter(function:periodic(?time, 1, .25, time:minutes)) - * <li>"?obs uri:hasTime ?time. - * <li>"?obs uri:hasId ?id } - * <li> + * those observations. + * <p> + * <pre> + * PREFIX function: http://org.apache.rya/function# + * PREFIX time: http://www.w3.org/2006/time# + * SELECT (count(?obs) as ?total) WHERE { + * FILTER (function:periodic(?time, 1, .25, time:minutes)) + * ?obs uri:hasTime ?time. + * ?obs uri:hasId ?id + * } + * </pre> */ public class PeriodicNotificationApplication implements LifeCycle { - private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class); - private NotificationCoordinatorExecutor coordinator; - private KafkaNotificationProvider provider; - private PeriodicQueryPrunerExecutor pruner; - private NotificationProcessorExecutor processor; - private KafkaExporterExecutor exporter; + private static final Logger log = LoggerFactory.getLogger(PeriodicNotificationApplication.class); + private final NotificationCoordinatorExecutor coordinator; + private final KafkaNotificationProvider provider; + private final PeriodicQueryPrunerExecutor pruner; + private final NotificationProcessorExecutor processor; + private final KafkaExporterExecutor exporter; private boolean running = false; + private Optional<CompletableFuture<Void>> finished = Optional.empty(); + /** * Creates a PeriodicNotificationApplication - * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka + * @param provider - {@link KafkaNotificationProvider} that retrieves new Notification requests from Kafka * @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications. * @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications * @param exporter - {@link KafkaExporterExecutor} that exports periodic results * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins */ - public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator, - NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) { + public PeriodicNotificationApplication(final KafkaNotificationProvider provider, final NotificationCoordinatorExecutor coordinator, + final NotificationProcessorExecutor processor, final KafkaExporterExecutor exporter, final PeriodicQueryPrunerExecutor pruner) { this.provider = Preconditions.checkNotNull(provider); this.coordinator = Preconditions.checkNotNull(coordinator); this.processor = Preconditions.checkNotNull(processor); @@ -115,18 +124,37 @@ public class PeriodicNotificationApplication implements LifeCycle { pruner.start(); exporter.start(); running = true; + finished = Optional.of(new CompletableFuture<>()); + } + } + + /** + * Blocks the current thread until another thread has called the {@link #stop()}. + * @throws ExecutionException + * @throws InterruptedException + * @throws IllegalStateException + */ + public void blockUntilFinished() throws ExecutionException, InterruptedException, IllegalStateException { + if(finished.isPresent()) { + finished.get().get(); + } else { + throw new IllegalStateException("Cannot block if the application has not been started yet"); } } @Override public void stop() { log.info("Stopping PeriodicNotificationApplication."); + if(!finished.isPresent()) { + throw new IllegalStateException("Cannot stop if the application has not been started yet"); + } provider.stop(); coordinator.stop(); processor.stop(); pruner.stop(); exporter.stop(); running = false; + finished.get().complete(null); } /** @@ -154,7 +182,7 @@ public class PeriodicNotificationApplication implements LifeCycle { * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins * @return this Builder for chaining method calls */ - public Builder setPruner(PeriodicQueryPrunerExecutor pruner) { + public Builder setPruner(final PeriodicQueryPrunerExecutor pruner) { this.pruner = pruner; return this; } @@ -164,12 +192,12 @@ public class PeriodicNotificationApplication implements LifeCycle { * @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka * @return this Builder for chaining method calls */ - public Builder setProvider(KafkaNotificationProvider provider) { + public Builder setProvider(final KafkaNotificationProvider provider) { this.provider = provider; return this; } - public Builder setProcessor(NotificationProcessorExecutor processor) { + public Builder setProcessor(final NotificationProcessorExecutor processor) { this.processor = processor; return this; } @@ -179,7 +207,7 @@ public class PeriodicNotificationApplication implements LifeCycle { * @param exporter for exporting periodic query results to Kafka * @return this Builder for chaining method calls */ - public Builder setExporter(KafkaExporterExecutor exporter) { + public Builder setExporter(final KafkaExporterExecutor exporter) { this.exporter = exporter; return this; } @@ -189,7 +217,7 @@ public class PeriodicNotificationApplication implements LifeCycle { * @param coordinator for managing and generating periodic notifications * @return this Builder for chaining method calls */ - public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) { + public Builder setCoordinator(final NotificationCoordinatorExecutor coordinator) { this.coordinator = coordinator; return this; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java index ff58979..9f0631d 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java @@ -18,35 +18,36 @@ */ package org.apache.rya.periodic.notification.application; +import java.util.Objects; import java.util.Properties; import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import com.google.common.base.Preconditions; /** * Configuration object for creating a {@link PeriodicNotificationApplication}. */ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration { - public static String FLUO_APP_NAME = "fluo.app.name"; - public static String FLUO_TABLE_NAME = "fluo.table.name"; - public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static String NOTIFICATION_TOPIC = "kafka.notification.topic"; - public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id"; - public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id"; - public static String COORDINATOR_THREADS = "cep.coordinator.threads"; - public static String PRODUCER_THREADS = "cep.producer.threads"; - public static String EXPORTER_THREADS = "cep.exporter.threads"; - public static String PROCESSOR_THREADS = "cep.processor.threads"; - public static String PRUNER_THREADS = "cep.pruner.threads"; - + public static final String RYA_PERIODIC_PREFIX = "rya.periodic.notification."; + public static final String RYA_PCJ_PREFIX = "rya.pcj."; + public static final String FLUO_APP_NAME = RYA_PCJ_PREFIX +"fluo.app.name"; + public static final String FLUO_TABLE_NAME = RYA_PCJ_PREFIX + "fluo.table.name"; + public static final String KAFKA_BOOTSTRAP_SERVERS = RYA_PERIODIC_PREFIX + "kafka.bootstrap.servers"; + public static final String NOTIFICATION_TOPIC = RYA_PERIODIC_PREFIX + "kafka.topic"; + public static final String NOTIFICATION_GROUP_ID = RYA_PERIODIC_PREFIX + "kafka.group.id"; + public static final String NOTIFICATION_CLIENT_ID = RYA_PERIODIC_PREFIX + "kafka.client.id"; + public static final String COORDINATOR_THREADS = RYA_PERIODIC_PREFIX + "coordinator.threads"; + public static final String PRODUCER_THREADS = RYA_PERIODIC_PREFIX + "producer.threads"; + public static final String EXPORTER_THREADS = RYA_PERIODIC_PREFIX + "exporter.threads"; + public static final String PROCESSOR_THREADS = RYA_PERIODIC_PREFIX + "processor.threads"; + public static final String PRUNER_THREADS = RYA_PERIODIC_PREFIX + "pruner.threads"; + public PeriodicNotificationApplicationConfiguration() {} - + /** * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file. This method assumes * that all values in the Properties file are Strings and that the Properties file uses the keys below. - * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example. * <br> * <ul> * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String. @@ -55,23 +56,22 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon * <li>"accumulo.password" - Accumulo password (required) * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance. Default is "rya_" * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required) - * <li>"fluo.app.name" - Name of Fluo Application (required) - * <li>"fluo.table.name" - Name of Fluo Table (required) - * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required) - * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications". - * <li>"kafka.notification.client.id" - Client Id for notification topic. Default is "consumer0" - * <li>"kafka.notification.group.id" - Group Id for notification topic. Default is "group0" - * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1. - * <li>"cep.producer.threads" - Number of threads used by producer. Default is 1. - * <li>"cep.exporter.threads" - Number of threads used by exporter. Default is 1. - * <li>"cep.processor.threads" - Number of threads used by processor. Default is 1. - * <li>"cep.pruner.threads" - Number of threads used by pruner. Default is 1. + * <li>"rya.pcj.fluo.app.name" - Name of Fluo Application (required) + * <li>"rya.pcj.fluo.table.name" - Name of Fluo Table (required) + * <li>"rya.periodic.notification.kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required) + * <li>"rya.periodic.notification.kafka.topic" - Topic to which new Periodic Notifications are published. Default is "notifications". + * <li>"rya.periodic.notification.kafka.client.id" - Client Id for notification topic. Default is "consumer0" + * <li>"rya.periodic.notification.kafka.group.id" - Group Id for notification topic. Default is "group0" + * <li>"rya.periodic.notification.coordinator.threads" - Number of threads used by coordinator. Default is 1. + * <li>"rya.periodic.notification.producer.threads" - Number of threads used by producer. Default is 1. + * <li>"rya.periodic.notification.exporter.threads" - Number of threads used by exporter. Default is 1. + * <li>"rya.periodic.notification.processor.threads" - Number of threads used by processor. Default is 1. + * <li>"rya.periodic.notification.pruner.threads" - Number of threads used by pruner. Default is 1. * </ul> * <br> * @param props - Properties file containing Accumulo specific configuration parameters - * @return AccumumuloRdfConfiguration with properties set */ - public PeriodicNotificationApplicationConfiguration(Properties props) { + public PeriodicNotificationApplicationConfiguration(final Properties props) { super(fromProperties(props)); setFluoAppName(props.getProperty(FLUO_APP_NAME)); setFluoTableName(props.getProperty(FLUO_TABLE_NAME)); @@ -85,170 +85,170 @@ public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfCon setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1"))); setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1"))); } - + /** * Sets the name of the Fluo Application - * @param fluoAppName + * @param fluoAppName */ - public void setFluoAppName(String fluoAppName) { - set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName)); + public void setFluoAppName(final String fluoAppName) { + set(FLUO_APP_NAME, Objects.requireNonNull(fluoAppName)); } - + /** * Sets the name of the Fluo table * @param fluoTableName */ - public void setFluoTableName(String fluoTableName) { - set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); + public void setFluoTableName(final String fluoTableName) { + set(FLUO_TABLE_NAME, Objects.requireNonNull(fluoTableName)); } - + /** * Sets the Kafka bootstrap servers * @param bootStrapServers */ - public void setBootStrapServers(String bootStrapServers) { - set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers)); + public void setBootStrapServers(final String bootStrapServers) { + set(KAFKA_BOOTSTRAP_SERVERS, Objects.requireNonNull(bootStrapServers)); } - + /** * Sets the Kafka topic name for new notification requests * @param notificationTopic */ - public void setNotificationTopic(String notificationTopic) { - set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic)); + public void setNotificationTopic(final String notificationTopic) { + set(NOTIFICATION_TOPIC, Objects.requireNonNull(notificationTopic)); } - + /** * Sets the GroupId for new notification request topic * @param notificationGroupId */ - public void setNotificationGroupId(String notificationGroupId) { - set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId)); + public void setNotificationGroupId(final String notificationGroupId) { + set(NOTIFICATION_GROUP_ID, Objects.requireNonNull(notificationGroupId)); } - + /** * Sets the ClientId for the Kafka notification topic * @param notificationClientId */ - public void setNotificationClientId(String notificationClientId) { - set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId)); + public void setNotificationClientId(final String notificationClientId) { + set(NOTIFICATION_CLIENT_ID, Objects.requireNonNull(notificationClientId)); } - + /** * Sets the number of threads for the coordinator * @param threads */ - public void setCoordinatorThreads(int threads) { + public void setCoordinatorThreads(final int threads) { setInt(COORDINATOR_THREADS, threads); } - + /** * Sets the number of threads for the exporter * @param threads */ - public void setExporterThreads(int threads) { + public void setExporterThreads(final int threads) { setInt(EXPORTER_THREADS, threads); } - + /** * Sets the number of threads for the producer for reading new periodic notifications * @param threads */ - public void setProducerThreads(int threads) { + public void setProducerThreads(final int threads) { setInt(PRODUCER_THREADS, threads); } - + /** * Sets the number of threads for the bin pruner * @param threads */ - public void setPrunerThreads(int threads) { + public void setPrunerThreads(final int threads) { setInt(PRUNER_THREADS, threads); } - + /** * Sets the number of threads for the Notification processor * @param threads */ - public void setProcessorThreads(int threads) { + public void setProcessorThreads(final int threads) { setInt(PROCESSOR_THREADS, threads); } - + /** * @return name of the Fluo application */ public String getFluoAppName() { return get(FLUO_APP_NAME); } - + /** * @return name of the Fluo table */ public String getFluoTableName() { - return get(FLUO_TABLE_NAME); + return get(FLUO_TABLE_NAME); } - + /** * @return Kafka bootstrap servers */ public String getBootStrapServers() { - return get(KAFKA_BOOTSTRAP_SERVERS); + return get(KAFKA_BOOTSTRAP_SERVERS); } - + /** * @return notification topic */ public String getNotificationTopic() { return get(NOTIFICATION_TOPIC, "notifications"); } - + /** * @return Kafka GroupId for the notificaton topic */ public String getNotificationGroupId() { return get(NOTIFICATION_GROUP_ID, "group0"); } - + /** * @return Kafka ClientId for the notification topic */ public String getNotificationClientId() { return get(NOTIFICATION_CLIENT_ID, "consumer0"); } - + /** * @return the number of threads for the coordinator */ public int getCoordinatorThreads() { return getInt(COORDINATOR_THREADS, 1); } - + /** * @return the number of threads for the exporter */ public int getExporterThreads() { return getInt(EXPORTER_THREADS, 1); } - + /** * @return the number of threads for the notification producer */ public int getProducerThreads() { return getInt(PRODUCER_THREADS, 1); } - + /** * @return the number of threads for the bin pruner */ public int getPrunerThreads() { return getInt(PRUNER_THREADS, 1); } - + /** * @return number of threads for the processor */ public int getProcessorThreads() { return getInt(PROCESSOR_THREADS, 1); } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java index 771a4ab..fbc03f3 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java @@ -18,10 +18,13 @@ */ package org.apache.rya.periodic.notification.application; +import java.io.File; +import java.io.FileInputStream; import java.util.Optional; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -33,6 +36,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; @@ -51,6 +55,9 @@ import org.apache.rya.periodic.notification.registration.kafka.KafkaNotification import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; import org.openrdf.query.BindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Factory for creating a {@link PeriodicNotificationApplication}. @@ -59,82 +66,88 @@ public class PeriodicNotificationApplicationFactory { /** * Create a PeriodicNotificationApplication. - * @param props - Properties file that specifies the parameters needed to create the application + * @param conf - Configuration object that specifies the parameters needed to create the application * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results * @throws PeriodicApplicationException */ - public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException { - PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props); - Properties kafkaProps = getKafkaProperties(conf); + public static PeriodicNotificationApplication getPeriodicApplication(final PeriodicNotificationApplicationConfiguration conf) throws PeriodicApplicationException { + final Properties kafkaConsumerProps = getKafkaConsumerProperties(conf); + final Properties kafkaProducerProps = getKafkaProducerProperties(conf); - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); - BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); + final BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + final BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + final BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); FluoClient fluo = null; try { - PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf); + final PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf); fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf); - NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications); + final NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications); addRegisteredNotices(coordinator, fluo.newSnapshot()); - KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets); - PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins); - NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads()); - KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps); + final KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProducerProps, bindingSets); + final PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins); + final NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads()); + final KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaConsumerProps); return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter) .setProcessor(processor).setPruner(pruner).build(); } catch (AccumuloException | AccumuloSecurityException e) { throw new PeriodicApplicationException(e.getMessage()); - } + } } - - private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) { + + private static void addRegisteredNotices(final NotificationCoordinatorExecutor coord, final Snapshot sx) { coord.start(); - PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); + final PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); provider.processRegisteredNotifications(coord, sx); } - private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) { + private static NotificationCoordinatorExecutor getCoordinator(final int numThreads, final BlockingQueue<TimestampedNotification> notifications) { return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications); } - private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) { - KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe()); + private static KafkaExporterExecutor getExporter(final int numThreads, final Properties props, final BlockingQueue<BindingSetRecord> bindingSets) { + final KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe()); return new KafkaExporterExecutor(producer, numThreads, bindingSets); } - private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads, - BlockingQueue<NodeBin> bins) { + private static PeriodicQueryPrunerExecutor getPruner(final PeriodicQueryResultStorage storage, final FluoClient fluo, final int numThreads, + final BlockingQueue<NodeBin> bins) { return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins); } - private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage, - BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, - int numThreads) { + private static NotificationProcessorExecutor getProcessor(final PeriodicQueryResultStorage periodicStorage, + final BlockingQueue<TimestampedNotification> notifications, final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets, + final int numThreads) { return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads); } - private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord, - Properties props) { + private static KafkaNotificationProvider getProvider(final int numThreads, final String topic, final NotificationCoordinatorExecutor coord, + final Properties props) { return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, numThreads); } - private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf) + private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(final PeriodicNotificationApplicationConfiguration conf) throws AccumuloException, AccumuloSecurityException { - Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers()); - Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword())); - String ryaInstance = conf.getTablePrefix(); + final Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers()); + final Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword())); + final String ryaInstance = conf.getTablePrefix(); return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance); } - - private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { - Properties kafkaProps = new Properties(); + + private static Properties getKafkaConsumerProperties(final PeriodicNotificationApplicationConfiguration conf) { + final Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId()); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProps.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000"); // reduce this value to 30 seconds for the scenario where we subscribe before the topic exists. return kafkaProps; } + private static Properties getKafkaProducerProperties(final PeriodicNotificationApplicationConfiguration conf) { + final Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); + return kafkaProps; + } }
