http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java new file mode 100644 index 0000000..37993bf --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Objects; + +@Parameters(commandNames = { "periodic" }, commandDescription = "Run benchmark with a PeriodicQuery that uses Filter(function:periodic(?temporalVariable, <windowSize>, <updatePeriod>, <timeUnits>)). This requires the Rya Periodic Notification Twill YARN Application to be running in addition to the Rya PCJ Updater Incremental Join Application.") +public class PeriodicQueryCommand extends BenchmarkOptions { + + @Parameter(names = { "-pqw", "--periodic-query-window" }, description = "The window size, in --periodic-query-time-units, for returning query results.", required = true) + private double periodicQueryWindow; + + @Parameter(names = { "-pqp", "--periodic-query-period" }, description = "The period, in --periodic-query-time-units, for results of the windowed query to be returned.", required = true) + private double periodicQueryPeriod; + + @Parameter(names = { "-pqtu", "--periodic-query-time-units" }, description = "The unit in time (days,hours,minutes)", required = true) + private PeriodicQueryTimeUnits periodicQueryTimeUnits; + + @Parameter(names = { "-pqrt", "--periodic-query-registration-topic" }, description = "The kafka topic which periodic notification registration requests are published to. (typically 'notifications')", required = true) + private String periodicQueryRegistrationTopic; + + public enum PeriodicQueryTimeUnits { + days, hours, minutes; + } + + public PeriodicQueryTimeUnits getPeriodicQueryTimeUnits() { + return periodicQueryTimeUnits; + } + + public double getPeriodicQueryWindow() { + return periodicQueryWindow; + } + + public double getPeriodicQueryPeriod() { + return periodicQueryPeriod; + } + + public String getPeriodicQueryRegistrationTopic() { + return periodicQueryRegistrationTopic; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("periodicQueryWindow", periodicQueryWindow) + .add("periodicQueryPeriod", periodicQueryPeriod) + .add("periodicQueryTimeUnits", periodicQueryTimeUnits) + .add("periodicQueryRegistrationTopic", periodicQueryRegistrationTopic) + .toString() + super.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java new file mode 100644 index 0000000..472b649 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.periodic; + +import com.beust.jcommander.Parameters; +import com.google.common.base.Objects; + +@Parameters(commandNames = { "projection" }, commandDescription = "Run benchmark with a simple projection query. This requires the Rya PCJ Updater Incremental Join Application to be running.") +public class ProjectionQueryCommand extends BenchmarkOptions { + + @Override + public String toString() { + return Objects.toStringHelper(this).toString() + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh b/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh new file mode 100644 index 0000000..a69b71d --- /dev/null +++ b/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# navigate to the project directory +PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd)) +cd $PROJECT_HOME + + +# run the program +$JAVA_HOME/bin/java -cp .:lib/* \ + -Dlog4j.configuration=conf/log4j.properties \ + org.apache.rya.benchmark.periodic.KafkaLatencyBenchmark \ + @conf/common.options \ + periodic \ + @conf/periodic.options \ + "$@" http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh b/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh new file mode 100644 index 0000000..6039e60 --- /dev/null +++ b/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# navigate to the project directory +PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd)) +cd $PROJECT_HOME + + +# run the program +$JAVA_HOME/bin/java -cp .:lib/* \ + -Dlog4j.configuration=conf/log4j.properties \ + org.apache.rya.benchmark.periodic.KafkaLatencyBenchmark \ + @conf/common.options \ + projection \ + @conf/projection.options \ + "$@" http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.export/export.client/conf/config.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/conf/config.xml b/extras/rya.export/export.client/conf/config.xml index 57787b1..f2a7fdd 100644 --- a/extras/rya.export/export.client/conf/config.xml +++ b/extras/rya.export/export.client/conf/config.xml @@ -16,18 +16,18 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <MergeToolConfiguration xmlns="http://mergeconfig"> - <parentHostname>10.63.8.102</parentHostname> - <parentUsername>SPEAR</parentUsername> - <parentPassword>spear</parentPassword> - <parentRyaInstanceName>spear_instance</parentRyaInstanceName> - <parentTablePrefix>asmith_demo_export_</parentTablePrefix> - <parentTomcatUrl>http://10.63.8.102:8080</parentTomcatUrl> + <parentHostname>10.10.10.100</parentHostname> + <parentUsername>accumuloUsername</parentUsername> + <parentPassword>accumuloPassword</parentPassword> + <parentRyaInstanceName>accumuloInstance</parentRyaInstanceName> + <parentTablePrefix>rya_demo_export_</parentTablePrefix> + <parentTomcatUrl>http://10.10.10.100:8080</parentTomcatUrl> <parentDBType>accumulo</parentDBType> <parentPort>1111</parentPort> - <childHostname>localhost</childHostname> + <childHostname>10.10.10.101</childHostname> <childRyaInstanceName>rya_demo_child</childRyaInstanceName> - <childTablePrefix>asmith_demo_export_</childTablePrefix> - <childTomcatUrl>http://localhost:8080</childTomcatUrl> + <childTablePrefix>rya_demo_export_</childTablePrefix> + <childTomcatUrl>http://10.10.10.101:8080</childTomcatUrl> <childDBType>mongo</childDBType> <childPort>27017</childPort> <mergePolicy>timestamp</mergePolicy> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java index d7a50a7..309cb1f 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java @@ -62,8 +62,8 @@ import com.google.common.base.Preconditions; */ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { - private String ryaInstance; - private Connector accumuloConn; + private final String ryaInstance; + private final Connector accumuloConn; private Authorizations auths; private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); @@ -75,10 +75,10 @@ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultSt * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance * @param ryaInstance - Rya Instance name for connecting to Rya */ - public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { + public AccumuloPeriodicQueryResultStorage(final Connector accumuloConn, final String ryaInstance) { this.accumuloConn = Preconditions.checkNotNull(accumuloConn); this.ryaInstance = Preconditions.checkNotNull(ryaInstance); - String user = accumuloConn.whoami(); + final String user = accumuloConn.whoami(); try { this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); } catch (AccumuloException | AccumuloSecurityException e) { @@ -87,21 +87,21 @@ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultSt } @Override - public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { + public String createPeriodicQuery(final String sparql) throws PeriodicQueryStorageException { Preconditions.checkNotNull(sparql); - String queryId = pcjIdFactory.nextId(); + final String queryId = pcjIdFactory.nextId(); return createPeriodicQuery(queryId, sparql); } - + @Override - public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException { + public String createPeriodicQuery(final String queryId, final String sparql) throws PeriodicQueryStorageException { Set<String> bindingNames; try { bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql); - } catch (MalformedQueryException e) { + } catch (final MalformedQueryException e) { throw new PeriodicQueryStorageException(e.getMessage()); } - List<String> varOrderList = new ArrayList<>(); + final List<String> varOrderList = new ArrayList<>(); varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId); varOrderList.addAll(bindingNames); createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList)); @@ -109,79 +109,88 @@ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultSt } @Override - public void createPeriodicQuery(String queryId, String sparql, VariableOrder order) throws PeriodicQueryStorageException { + public void createPeriodicQuery(final String queryId, final String sparql, final VariableOrder order) throws PeriodicQueryStorageException { Preconditions.checkNotNull(sparql); Preconditions.checkNotNull(queryId); Preconditions.checkNotNull(order); Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)), "periodicBinId binding name must occur first in VariableOrder."); - String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); - Set<VariableOrder> varOrders = new HashSet<>(); + final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); + final Set<VariableOrder> varOrders = new HashSet<>(); varOrders.add(order); try { pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); } } @Override - public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryId) throws PeriodicQueryStorageException { + public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(final String queryId) throws PeriodicQueryStorageException { try { return new PeriodicQueryStorageMetadata( pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId))); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); } } @Override - public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException { + public void addPeriodicQueryResults(final String queryId, final Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException { results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId), "BindingSet must contain periodBinId binding.")); try { pcjTables.addResults(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId), results); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); } } @Override - public void deletePeriodicQueryResults(String queryId, long binId) throws PeriodicQueryStorageException { - String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); + public void deletePeriodicQueryResults(final String queryId, final long binId) throws PeriodicQueryStorageException { + final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); + BatchDeleter deleter = null; try { - Text prefix = getRowPrefix(binId); - BatchDeleter deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig()); + final Text prefix = getRowPrefix(binId); + deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig()); deleter.setRanges(Collections.singleton(Range.prefix(prefix))); deleter.delete(); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); + } finally { + try { + if(deleter != null) { + deleter.close(); + } + } catch (final Exception e) { + throw new PeriodicQueryStorageException(e.getMessage()); + } } } - public void deletePeriodicQueryResults(String queryId) throws PeriodicQueryStorageException { + public void deletePeriodicQueryResults(final String queryId) throws PeriodicQueryStorageException { try { pcjTables.purgePcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); } } @Override - public void deletePeriodicQuery(String queryId) throws PeriodicQueryStorageException { + public void deletePeriodicQuery(final String queryId) throws PeriodicQueryStorageException { try { pcjTables.dropPcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(e.getMessage()); } } @Override - public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binId) + public CloseableIterator<BindingSet> listResults(final String queryId, final Optional<Long> binId) throws PeriodicQueryStorageException { requireNonNull(queryId); - String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); + final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); // Fetch the Variable Orders for the binding sets and choose one of // them. It // doesn't matter which one we choose because they all result in the @@ -199,15 +208,15 @@ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultSt } return new AccumuloValueBindingSetIterator(scanner); - } catch (Exception e) { + } catch (final Exception e) { throw new PeriodicQueryStorageException(String.format("PCJ Table does not exist for name '%s'.", tableName), e); } } - - private Text getRowPrefix(long binId) throws BindingSetConversionException { - QueryBindingSet bs = new QueryBindingSet(); + + private Text getRowPrefix(final long binId) throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, new LiteralImpl(Long.toString(binId), XMLSchema.LONG)); - + return new Text(converter.convert(bs, new VariableOrder(PeriodicQueryResultStorage.PeriodicBinId))); } @@ -236,35 +245,35 @@ public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultSt } return periodicTables; } - + /** * Class for removing any aggregate variables from the ProjectionElementList * of the parsed SPARQL queries. This ensures that only non-aggregation * values are contained in the Accumulo row. The non-aggregation variables * are not updated while the aggregation variables are, so they are included in * the serialized BindingSet in the Accumulo Value field, which is overwritten - * if an entry with the same Key and different Value (updated aggregation) is + * if an entry with the same Key and different Value (updated aggregation) is * written to the table. * */ static class AggregateVariableRemover extends QueryModelVisitorBase<RuntimeException> { - + private Set<String> bindingNames; - - public Set<String> getNonAggregationVariables(String sparql) throws MalformedQueryException { - TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + + public Set<String> getNonAggregationVariables(final String sparql) throws MalformedQueryException { + final TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); bindingNames = te.getBindingNames(); te.visit(this); return bindingNames; } - + @Override - public void meet(ExtensionElem node) { + public void meet(final ExtensionElem node) { if(node.getExpr() instanceof AggregateOperatorBase) { bindingNames.remove(node.getName()); } } - + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.manual/src/site/markdown/pcj-updater.md ---------------------------------------------------------------------- diff --git a/extras/rya.manual/src/site/markdown/pcj-updater.md b/extras/rya.manual/src/site/markdown/pcj-updater.md index 8f3c27f..11cb560 100644 --- a/extras/rya.manual/src/site/markdown/pcj-updater.md +++ b/extras/rya.manual/src/site/markdown/pcj-updater.md @@ -211,14 +211,16 @@ Add the following entries under Observer properties in the # fluo.observer.0=com.foo.Observer1 # Can optionally have configuration key values # fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2 -fluo.observer.0=org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver -fluo.observer.1=org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver -fluo.observer.2=org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver -fluo.observer.3=org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver -fluo.observer.4=org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver -fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver -#fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver -fluo.observer.6=org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver,pcj.fluo.export.rya.enabled=true,pcj.fluo.export.rya.ryaInstanceName=rya_,pcj.fluo.export.rya.accumuloInstanceName=myAccumuloInstance,pcj.fluo.export.rya.zookeeperServers=zoo1;zoo2;zoo3,pcj.fluo.export.rya.exporterUsername=myUserName,pcj.fluo.export.rya.exporterPassword=myPassword,pcj.fluo.export.kafka.enabled=true,bootstrap.servers=myKafkaBroker:9092,key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer,value.serializer=org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer +fluo.observer.0=org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver +fluo.observer.1=org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver +fluo.observer.2=org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver +fluo.observer.3=org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver +fluo.observer.4=org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver +fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver +fluo.observer.6=org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver +fluo.observer.7=org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver +#fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver +fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver,pcj.fluo.export.rya.enabled=true,pcj.fluo.export.rya.ryaInstanceName=rya_,pcj.fluo.export.rya.fluo.application.name=rya_pcj_updater,pcj.fluo.export.rya.accumuloInstanceName=myAccumuloInstance,pcj.fluo.export.rya.zookeeperServers=zoo1;zoo2;zoo3;zoo4;zoo5,pcj.fluo.export.rya.exporterUsername=myUserName,pcj.fluo.export.rya.exporterPassword=myPassword,pcj.fluo.export.rya.bindingset.enabled=true,pcj.fluo.export.periodic.bindingset.enabled=true,pcj.fluo.export.kafka.subgraph.enabled=true,pcj.fluo.export.kafka.bindingset.enabled=true,bootstrap.servers=kafka1:9092 ``` Description of configuration keys for the http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index e2f3a22..01da2dc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -35,9 +35,7 @@ A Fluo implementation of Rya Precomputed Join Indexing. This module produces a jar that may be executed by the 'fluo' command line tool as a YARN job. </description> - <properties> - <kryo.version>3.0.3</kryo.version> - </properties> + <dependencies> <dependency> <groupId>org.slf4j</groupId> @@ -81,7 +79,6 @@ <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> - <version>${kryo.version}</version> </dependency> <!-- Testing dependencies. --> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 17ed158..2cc9f77 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; @@ -46,6 +45,8 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -58,7 +59,7 @@ import info.aduna.iteration.CloseableIteration; @DefaultAnnotation(NonNull.class) public class FilterResultUpdater { - private static final Logger log = Logger.getLogger(FilterResultUpdater.class); + private static final Logger log = LoggerFactory.getLogger(FilterResultUpdater.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); @@ -101,14 +102,11 @@ public class FilterResultUpdater { checkNotNull(childBindingSet); checkNotNull(filterMetadata); - log.trace( - "Transaction ID: " + tx.getStartTimestamp() + "\n" + - "Filter Node ID: " + filterMetadata.getNodeId() + "\n" + - "Binding Set:\n" + childBindingSet + "\n"); + log.trace("Transaction ID: {}\nFilter Node ID: {}\nBinding Set:\n{}\n", tx.getStartTimestamp(), filterMetadata.getNodeId(), childBindingSet); // Parse the original query and find the Filter that represents filterId. final String sparql = filterMetadata.getFilterSparql(); - Filter filter = FilterSerializer.deserialize(sparql); + final Filter filter = FilterSerializer.deserialize(sparql); // Evaluate whether the child BindingSet satisfies the filter's condition. final ValueExpr condition = filter.getCondition(); @@ -120,7 +118,7 @@ public class FilterResultUpdater { // Serialize and emit BindingSet final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); - log.trace("Transaction ID: " + tx.getStartTimestamp() + "\n" + "New Binding Set: " + childBindingSet + "\n"); + log.trace("Transaction ID: {}\nNew Binding Set: {}\n", tx.getStartTimestamp(), childBindingSet); tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java index 4933d57..2da3e39 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java @@ -23,8 +23,6 @@ import java.util.Objects; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; -import com.google.common.base.Preconditions; - /** * Abstract class for generating span based notifications. A spanned notification * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. @@ -43,7 +41,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation */ public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { super(batchSize, task, column); - this.span = Preconditions.checkNotNull(span); + this.span = Objects.requireNonNull(span); } public AbstractSpanBatchInformation(Task task, Column column, Span span) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java index 3354fdc..989a8e5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java @@ -23,12 +23,9 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.openrdf.query.Binding; -import com.google.common.base.Preconditions; - /** * This class updates join results based on parameters specified for the join's * children. The join has two children, and for one child a VisibilityBindingSet @@ -66,9 +63,9 @@ public class JoinBatchInformation extends AbstractSpanBatchInformation { */ public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) { super(batchSize, task, column, span); - this.bs = Preconditions.checkNotNull(bs); - this.side = Preconditions.checkNotNull(side); - this.join = Preconditions.checkNotNull(join); + this.bs = Objects.requireNonNull(bs); + this.side = Objects.requireNonNull(side); + this.join = Objects.requireNonNull(join); } public JoinBatchInformation(Task task, Column column, Span span, VisibilityBindingSet bs, Side side, JoinType join) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java index 0c26d65..61b3aa2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java @@ -27,11 +27,12 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.log4j.Logger; import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; @@ -39,8 +40,8 @@ import com.google.common.collect.Sets; * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { - - private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class); + + private static final Logger log = LoggerFactory.getLogger(KafkaBindingSetExporter.class); private final KafkaProducer<String, VisibilityBindingSet> producer; @@ -50,7 +51,7 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { * @param producer for sending result set alerts to a broker. (not null) Can be created and configured by * {@link KafkaBindingSetExporterFactory} */ - public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) { + public KafkaBindingSetExporter(final KafkaProducer<String, VisibilityBindingSet> producer) { super(); checkNotNull(producer, "Producer is required."); this.producer = producer; @@ -64,9 +65,6 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { checkNotNull(queryId); checkNotNull(result); try { - final String msg = "Out to Kafka topic: " + queryId + ", Result: " + result; - log.trace(msg); - // Send the result to the topic whose name matches the PCJ ID. final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(queryId, result); final Future<RecordMetadata> future = producer.send(rec); @@ -74,7 +72,7 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. future.get(); - log.debug("producer.send(rec) completed"); + log.debug("Producer successfully sent record with queryId: {} and visbilityBindingSet: \n{}", queryId, result); } catch (final Throwable e) { throw new ResultExportException("A result could not be exported to Kafka.", e); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java index b796a6f..c25cde6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java @@ -21,11 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import org.apache.fluo.api.observer.Observer.Context; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Optional; @@ -42,18 +43,19 @@ import com.google.common.base.Optional; * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); * </pre> - * + * * @see ProducerConfig */ public class KafkaBindingSetExporterFactory implements IncrementalResultExporterFactory { - private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class); + private static final Logger log = LoggerFactory.getLogger(KafkaBindingSetExporterFactory.class); + @Override - public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { final KafkaBindingSetExporterParameters exportParams = new KafkaBindingSetExporterParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaBindingSetExporter()); if (exportParams.getUseKafkaBindingSetExporter()) { + log.info("Exporter is enabled."); // Setup Kafka connection - KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig()); + final KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<>(exportParams.listAllConfig()); // Create the exporter final IncrementalBindingSetExporter exporter = new KafkaBindingSetExporter(producer); return Optional.of(exporter); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java index 8686c85..2d25c35 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -26,8 +27,6 @@ import org.apache.fluo.api.observer.Observer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase; -import com.google.common.base.Preconditions; - /** * Provides read/write functions to the parameters map that is passed into an * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related @@ -46,7 +45,7 @@ public class KafkaExportParameterBase extends ParametersBase { * @param bootstrapServers - connect string for Kafka brokers */ public void setKafkaBootStrapServers(String bootstrapServers) { - params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Preconditions.checkNotNull(bootstrapServers)); + params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers)); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java index da26329..df0e387 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java @@ -26,37 +26,42 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.log4j.Logger; import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; - /** - * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application + * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application * */ public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter { private final KafkaProducer<String, RyaSubGraph> producer; - private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporter.class); + private static final Logger log = LoggerFactory.getLogger(KafkaRyaSubGraphExporter.class); + - public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> producer) { + /** + * + * @param producer - The producer used by this exporter. + */ + public KafkaRyaSubGraphExporter(final KafkaProducer<String, RyaSubGraph> producer) { checkNotNull(producer); this.producer = producer; } - + /** * Exports the RyaSubGraph to a Kafka topic equivalent to the result returned by {@link RyaSubGraph#getId()} * @param subgraph - RyaSubGraph exported to Kafka * @param contructID - rowID of result that is exported. Used for logging purposes. */ @Override - public void export(String constructID, RyaSubGraph subGraph) throws ResultExportException { + public void export(final String constructID, final RyaSubGraph subGraph) throws ResultExportException { checkNotNull(constructID); checkNotNull(subGraph); try { @@ -67,7 +72,7 @@ public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. future.get(); - log.debug("Producer successfully sent record with id: " + constructID + " and statements: " + subGraph.getStatements()); + log.debug("Producer successfully sent record with id: {} and statements: {}", constructID, subGraph.getStatements()); } catch (final Throwable e) { throw new ResultExportException("A result could not be exported to Kafka.", e); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java index 60e9294..e0c4190 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java @@ -19,11 +19,12 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; */ import org.apache.fluo.api.observer.Observer.Context; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Optional; @@ -34,10 +35,10 @@ import com.google.common.base.Optional; */ public class KafkaRyaSubGraphExporterFactory implements IncrementalResultExporterFactory { - private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class); + private static final Logger log = LoggerFactory.getLogger(KafkaRyaSubGraphExporterFactory.class); public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = "pcj.fluo.export.kafka.subgraph.enabled"; public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = "pcj.fluo.export.kafka.subgraph.serializer"; - + /** * Builds a {@link KafkaRyaSubGraphExporter}. * @param context - {@link Context} object used to pass configuration parameters @@ -46,12 +47,12 @@ public class KafkaRyaSubGraphExporterFactory implements IncrementalResultExporte * @throws ConfigurationException */ @Override - public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException { + public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { final KafkaSubGraphExporterParameters exportParams = new KafkaSubGraphExporterParameters(context.getObserverConfiguration().toMap()); - log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaSubgraphExporter()); + log.info("Exporter is enabled: {}", exportParams.getUseKafkaSubgraphExporter()); if (exportParams.getUseKafkaSubgraphExporter()) { // Setup Kafka connection - KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig()); + final KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig()); // Create the exporter final IncrementalRyaSubGraphExporter exporter = new KafkaRyaSubGraphExporter(producer); return Optional.of(exporter); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java index d12233a..c67527d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -51,7 +50,7 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { @Override protected Kryo initialValue() { - Kryo kryo = new Kryo(); + final Kryo kryo = new Kryo(); return kryo; }; }; @@ -60,7 +59,7 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. * If you don't want to use Kyro, here is an alternative: * return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null); - * + * * @param topic * ignored * @param data @@ -68,9 +67,9 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * @return deserialized instance of VisibilityBindingSet */ @Override - public VisibilityBindingSet deserialize(String topic, byte[] data) { - KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); - Input input = new Input(new ByteArrayInputStream(data)); + public VisibilityBindingSet deserialize(final String topic, final byte[] data) { + final KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + final Input input = new Input(new ByteArrayInputStream(data)); return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class); } @@ -78,7 +77,7 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * Ignored. Nothing to configure. */ @Override - public void configure(Map<String, ?> configs, boolean isKey) { + public void configure(final Map<String, ?> configs, final boolean isKey) { // Do nothing. } @@ -86,7 +85,7 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * Serialize a VisibilityBindingSet using Kyro lib. Exporting results of queries. * If you don't want to use Kyro, here is an alternative: * return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8); - * + * * @param topic * ignored * @param data @@ -94,13 +93,13 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * @return Serialized form of VisibilityBindingSet */ @Override - public byte[] serialize(String topic, VisibilityBindingSet data) { - KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); + public byte[] serialize(final String topic, final VisibilityBindingSet data) { + final KryoInternalSerializer internalSerializer = new KryoInternalSerializer(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final Output output = new Output(baos); internalSerializer.write(kryos.get(), output, data); output.flush(); - byte[] array = baos.toByteArray(); + final byte[] array = baos.toByteArray(); return array; } @@ -127,14 +126,12 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility * */ private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> { - private static final Logger log = Logger.getLogger(KryoVisibilityBindingSetSerializer.class); @Override - public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) { - log.debug("Serializer writing visBindingSet" + visBindingSet); + public void write(final Kryo kryo, final Output output, final VisibilityBindingSet visBindingSet) { output.writeString(visBindingSet.getVisibility()); // write the number count for the reader. output.writeInt(visBindingSet.size()); - for (Binding binding : visBindingSet) { + for (final Binding binding : visBindingSet) { output.writeString(binding.getName()); final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue()); final String valueString = ryaValue.getData(); @@ -145,19 +142,18 @@ public class KryoVisibilityBindingSetSerializer implements Serializer<Visibility } @Override - public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) { - log.debug("Serializer reading visBindingSet"); - String visibility = input.readString(); - int bindingCount = input.readInt(); - ArrayList<String> namesList = new ArrayList<String>(bindingCount); - ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount); + public VisibilityBindingSet read(final Kryo kryo, final Input input, final Class<VisibilityBindingSet> aClass) { + final String visibility = input.readString(); + final int bindingCount = input.readInt(); + final ArrayList<String> namesList = new ArrayList<String>(bindingCount); + final ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount); for (int i = bindingCount; i > 0; i--) { namesList.add(input.readString()); - String valueString = input.readString(); + final String valueString = input.readString(); final URI type = new URIImpl(input.readString()); valuesList.add(makeValue(valueString, type)); } - BindingSet bindingSet = new ListBindingSet(namesList, valuesList); + final BindingSet bindingSet = new ListBindingSet(namesList, valuesList); return new VisibilityBindingSet(bindingSet, visibility); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java index a87243e..901f351 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java @@ -30,10 +30,8 @@ import org.apache.fluo.api.observer.Observer.Context; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import com.google.common.base.Optional; @@ -45,7 +43,6 @@ public class RyaBindingSetExporterFactory implements IncrementalResultExporterFa @Override public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException { checkNotNull(context); - // Wrap the context's parameters for parsing. final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() ); @@ -64,7 +61,7 @@ public class RyaBindingSetExporterFactory implements IncrementalResultExporterFa // Setup Rya PCJ Storage. final String ryaInstanceName = params.getRyaInstanceName().get(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); - + // Make the exporter. final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage); return Optional.of(exporter); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index e07c514..9514932 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; @@ -38,6 +37,8 @@ import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFact import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; @@ -46,14 +47,14 @@ import com.google.common.collect.ImmutableSet; * Performs incremental result exporting to the configured destinations. */ public class QueryResultObserver extends AbstractObserver { - - private static final Logger log = Logger.getLogger(QueryResultObserver.class); - private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); - + + private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class); + private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO(); + /** * Builders for each type of {@link IncrementalBindingSetExporter} we support. */ - private static final ImmutableSet<IncrementalResultExporterFactory> factories = + private static final ImmutableSet<IncrementalResultExporterFactory> FACTORIES = ImmutableSet.<IncrementalResultExporterFactory>builder() .add(new RyaBindingSetExporterFactory()) .add(new KafkaBindingSetExporterFactory()) @@ -61,7 +62,7 @@ public class QueryResultObserver extends AbstractObserver { .add(new RyaSubGraphExporterFactory()) .add(new PeriodicBindingSetExporterFactory()) .build(); - + private ExporterManager exporterManager; @Override @@ -74,25 +75,25 @@ public class QueryResultObserver extends AbstractObserver { */ @Override public void init(final Context context) { - - ExporterManager.Builder managerBuilder = ExporterManager.builder(); - - for(final IncrementalResultExporterFactory builder : factories) { - try { - log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + final ExporterManager.Builder managerBuilder = ExporterManager.builder(); + + for(final IncrementalResultExporterFactory builder : FACTORIES) { + try { + log.debug("Attempting to build exporter from factory: {}", builder); final Optional<IncrementalResultExporter> exporter = builder.build(context); if(exporter.isPresent()) { + log.info("Adding exporter: {}", exporter.get()); managerBuilder.addIncrementalResultExporter(exporter.get()); } } catch (final IncrementalExporterFactoryException e) { log.error("Could not initialize a result exporter.", e); } } - + exporterManager = managerBuilder.build(); } - + @Override public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception { @@ -100,11 +101,11 @@ public class QueryResultObserver extends AbstractObserver { // Read the queryId from the row and get the QueryMetadata. final String queryId = row.split(NODEID_BS_DELIM)[0]; - final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); + final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId); // Read the Child Binding Set that will be exported. final Bytes valueBytes = tx.get(brow, col); - + exporterManager.export(metadata.getQueryType(), metadata.getExportStrategies(), queryId, valueBytes); } @@ -112,7 +113,7 @@ public class QueryResultObserver extends AbstractObserver { public void close() { try { exporterManager.close(); - } catch (Exception e) { + } catch (final Exception e) { log.warn("Encountered problems closing the ExporterManager."); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index 6fc8e91..2d7f390 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -31,7 +31,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; @@ -41,6 +40,8 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.collect.Maps; @@ -51,7 +52,7 @@ import com.google.common.collect.Maps; * the new result is stored as a binding set for the pattern. */ public class TripleObserver extends AbstractObserver { - private static final Logger log = Logger.getLogger(TripleObserver.class); + private static final Logger log = LoggerFactory.getLogger(TripleObserver.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO(); @@ -68,9 +69,7 @@ public class TripleObserver extends AbstractObserver { public void process(final TransactionBase tx, final Bytes brow, final Column column) { // Get string representation of triple. final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow); - log.trace( - "Transaction ID: " + tx.getStartTimestamp() + "\n" + - "Rya Statement: " + ryaStatement + "\n"); + log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement); final String triple = IncUpdateDAO.getTripleString(ryaStatement); @@ -114,10 +113,8 @@ public class TripleObserver extends AbstractObserver { try { final Bytes valueBytes = BS_SERDE.serialize(visBindingSet); - log.trace( - "Transaction ID: " + tx.getStartTimestamp() + "\n" + - "Matched Statement Pattern: " + spID + "\n" + - "Binding Set: " + visBindingSet + "\n"); + log.trace("Transaction ID: {}\nMatched Statement Pattern: {}\nBinding Set: {}\n", + tx.getStartTimestamp(), spID, visBindingSet); tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes); } catch(final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index a1c7c00..acf9b39 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -81,7 +81,7 @@ public class FluoQuery { final Optional<PeriodicQueryMetadata> periodicQueryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, - final ImmutableMap<String, JoinMetadata> joinMetadata, + final ImmutableMap<String, JoinMetadata> joinMetadata, final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { this.aggregationMetadata = requireNonNull(aggregationMetadata); this.queryMetadata = requireNonNull(queryMetadata); @@ -94,7 +94,7 @@ public class FluoQuery { this.joinMetadata = requireNonNull(joinMetadata); this.type = queryMetadata.getQueryType(); } - + /** * Returns the {@link QueryType} of this query * @return the QueryType of this query (either Construct or Projection} @@ -102,7 +102,7 @@ public class FluoQuery { public QueryType getQueryType() { return type; } - + /** * @return the unique id of this query */ @@ -116,66 +116,66 @@ public class FluoQuery { public QueryMetadata getQueryMetadata() { return queryMetadata; } - + /** * @param nodeId - node id of the query metadata * @return Optional containing the queryMetadata if it matches the specified nodeId */ - public Optional<QueryMetadata> getQueryMetadata(String nodeId) { + public Optional<QueryMetadata> getQueryMetadata(final String nodeId) { if(queryMetadata.getNodeId().equals(nodeId)) { return Optional.of(queryMetadata); } else { return Optional.absent(); } } - + /** * @return construct query metadata for generating subgraphs */ public Optional<ConstructQueryMetadata> getConstructQueryMetadata() { return constructMetadata; } - + /** * @param nodeId - node id of the ConstructMetadata * @return Optional containing the ConstructMetadata if it is present and has the given nodeId */ - public Optional<ConstructQueryMetadata> getConstructQueryMetadata(String nodeId) { + public Optional<ConstructQueryMetadata> getConstructQueryMetadata(final String nodeId) { if(constructMetadata.isPresent() && constructMetadata.get().getNodeId().equals(nodeId)) { return constructMetadata; } else { return Optional.absent(); } } - + /** * @param nodeId - id of the Projection metadata you want (not null) * @return projection metadata corresponding to give nodeId */ - public Optional<ProjectionMetadata> getProjectionMetadata(String nodeId) { + public Optional<ProjectionMetadata> getProjectionMetadata(final String nodeId) { return Optional.fromNullable(projectionMetadata.get(nodeId)); } - + /** * @return All of the projection metadata that is stored for the query */ public Collection<ProjectionMetadata> getProjectionMetadata() { return projectionMetadata.values(); } - + /** * @return All of the Periodic Query metadata that is stored for the query. */ public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() { return periodicQueryMetadata; } - + /** * @param nodeId - id of the PeriodicQueryMetadata * @return Optional containing the PeriodicQueryMetadata if it is present and has the given nodeId */ - public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(String nodeId) { - + public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(final String nodeId) { + if(periodicQueryMetadata.isPresent() && periodicQueryMetadata.get().getNodeId().equals(nodeId)) { return periodicQueryMetadata; } else { @@ -294,17 +294,17 @@ public class FluoQuery { builder.append(queryMetadata.toString()); builder.append("\n"); - + for(final ProjectionMetadata metadata : projectionMetadata.values()) { builder.append(metadata); builder.append("\n"); } - + if(constructMetadata.isPresent()) { builder.append( constructMetadata.get().toString() ); builder.append("\n"); } - + if(periodicQueryMetadata.isPresent()) { builder.append(periodicQueryMetadata.get()); builder.append("\n"); @@ -372,20 +372,20 @@ public class FluoQuery { public QueryMetadata.Builder getQueryBuilder() { return queryBuilder; } - + /** * @param nodeId - id of the QueryMetadata.Builder * @return Optional containing the QueryMetadata.Builder if it has the specified nodeId */ - public Optional<QueryMetadata.Builder> getQueryBuilder(String nodeId) { + public Optional<QueryMetadata.Builder> getQueryBuilder(final String nodeId) { if(queryBuilder.getNodeId().equals(nodeId)) { return Optional.of(queryBuilder); } else { return Optional.absent(); } - + } - + /** * Sets the {@link ProjectionMetadata.Builder} that is used by this builder. * @@ -401,11 +401,11 @@ public class FluoQuery { /** * @return The ProjectionMetadata builder if one has been set. */ - public Optional<ProjectionMetadata.Builder> getProjectionBuilder(String nodeId) { + public Optional<ProjectionMetadata.Builder> getProjectionBuilder(final String nodeId) { requireNonNull(nodeId); return Optional.fromNullable( projectionBuilders.get(nodeId) ); } - + /** * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder. * @@ -421,21 +421,21 @@ public class FluoQuery { * @param id of the ConstructQueryMetadata.Builder * @return Optional containing the ConstructQueryMetadata.Builder if it has been set and has the given nodeId. */ - public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder(String nodeId) { + public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder(final String nodeId) { if(constructBuilder != null && constructBuilder.getNodeId().equals(nodeId)) { return Optional.of(constructBuilder); } else { return Optional.absent(); } } - + /** * @return The Construct Query metadata builder if one has been set. */ public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() { return Optional.fromNullable( constructBuilder ); } - + /** * Adds a new {@link StatementPatternMetadata.Builder} to this builder. @@ -505,7 +505,7 @@ public class FluoQuery { requireNonNull(nodeId); return Optional.fromNullable( joinBuilders.get(nodeId) ); } - + /** * Get an Aggregate builder from this builder. * @@ -528,7 +528,7 @@ public class FluoQuery { this.aggregationBuilders.put(aggregationBuilder.getNodeId(), aggregationBuilder); return this; } - + /** * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder. * @@ -549,33 +549,33 @@ public class FluoQuery { public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder() { return Optional.fromNullable( periodicQueryBuilder); } - + /** * @param - id of the PeriodicQueryMetadata.Builder * @return - Optional containing the PeriodicQueryMetadata.Builder if one has been set and it has the given nodeId */ - public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder(String nodeId) { - + public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder(final String nodeId) { + if(periodicQueryBuilder != null && periodicQueryBuilder.getNodeId().equals(nodeId)) { return Optional.of(periodicQueryBuilder); } else { return Optional.absent(); } } - + /** * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException */ public FluoQuery build() throws UnsupportedQueryException { checkArgument((projectionBuilders.size() > 0 || constructBuilder != null)); - - Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder(); + + final Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder(); PeriodicQueryMetadata periodicQueryMetadata = null; if(optionalPeriodicQueryBuilder.isPresent()) { periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build(); } - + final ImmutableMap.Builder<String, ProjectionMetadata> projectionMetadata = ImmutableMap.builder(); for(final Entry<String, ProjectionMetadata.Builder> entry : projectionBuilders.entrySet()) { projectionMetadata.put(entry.getKey(), entry.getValue().build()); @@ -601,8 +601,8 @@ public class FluoQuery { aggregateMetadata.put(entry.getKey(), entry.getValue().build()); } - QueryMetadata qMetadata = queryBuilder.build(); - + final QueryMetadata qMetadata = queryBuilder.build(); + if(constructBuilder != null) { if(periodicQueryMetadata != null) { throw new UnsupportedQueryException("Queries containing sliding window filters and construct query patterns are not supported."); @@ -612,10 +612,10 @@ public class FluoQuery { if(aggregationBuilders.size() > 0 && qMetadata.getQueryType() == QueryType.PROJECTION && qMetadata.getExportStrategies().contains(ExportStrategy.RYA)) { throw new UnsupportedQueryException("Exporting to Rya PCJ tables is currently not supported for queries containing aggregations."); } - - return new FluoQuery(queryBuilder.build(), projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + + return new FluoQuery(qMetadata, projectionMetadata.build(), Optional.absent(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } - + } } } \ No newline at end of file