Repository: incubator-rya Updated Branches: refs/heads/master 8acd24b5e -> 62de7c5d1
FluoQueryMetadataCache Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/6d2bfcbc Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/6d2bfcbc Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/6d2bfcbc Branch: refs/heads/master Commit: 6d2bfcbcc1fe68e74521724d6f5490a6b9c70038 Parents: 8acd24b Author: Caleb Meier <[email protected]> Authored: Thu Oct 26 14:53:56 2017 -0700 Committer: Caleb Meier <[email protected]> Committed: Tue Nov 21 07:30:45 2017 -0800 ---------------------------------------------------------------------- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 48 ++-- .../fluo/app/observers/BindingSetUpdater.java | 15 +- .../observers/ConstructQueryResultObserver.java | 11 +- .../fluo/app/observers/QueryResultObserver.java | 8 +- .../pcj/fluo/app/observers/TripleObserver.java | 5 +- .../fluo/app/query/FluoQueryMetadataCache.java | 241 +++++++++++++++++++ .../fluo/app/query/MetadataCacheSupplier.java | 44 ++++ .../app/query/FluoQueryMetadataCacheTest.java | 34 +++ .../pcj/fluo/integration/FluoLatencyIT.java | 169 +++++++++++++ .../pcj/fluo/test/base/KafkaExportITBase.java | 11 +- 10 files changed, 538 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index 01da2dc..fd624eb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -1,25 +1,16 @@ <?xml version="1.0" encoding="utf-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rya</groupId> @@ -41,12 +32,14 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf project. RYA-341 --> - <!-- <version>13.0</version> Overriding Rya's Guava version to be compatible with Fluo's required version. Alternative is relocation with shade. --> + <!-- Uncommment this block when rya.pcj.fluo.app becomes a leaf + project. RYA-341 --> + <!-- <version>13.0</version> Overriding Rya's Guava version to + be compatible with Fluo's required version. Alternative is relocation with + shade. --> </dependency> <!-- Rya Runtime Dependencies. --> @@ -75,7 +68,7 @@ <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> - + <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> @@ -87,6 +80,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index c0cfa1d..9e47132 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -36,8 +36,9 @@ import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; @@ -55,7 +56,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; public abstract class BindingSetUpdater extends AbstractObserver { private static final Logger log = Logger.getLogger(BindingSetUpdater.class); // DAO - protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); // Updaters private final JoinResultUpdater joinUpdater = new JoinResultUpdater(); @@ -117,9 +118,9 @@ public abstract class BindingSetUpdater extends AbstractObserver { } catch (final Exception e) { throw new RuntimeException("Could not process a Query node.", e); } - break; - - case CONSTRUCT: + break; + + case CONSTRUCT: final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId); try{ constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery); @@ -127,7 +128,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { throw new RuntimeException("Could not process a Query node.", e); } break; - + case FILTER: final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId); try { @@ -145,7 +146,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { throw new RuntimeException("Could not process a Join node.", e); } break; - + case PERIODIC_QUERY: final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId); try{ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java index 61e7244..09d9ede 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java @@ -29,6 +29,8 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; /** * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new @@ -40,6 +42,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; public class ConstructQueryResultObserver extends AbstractObserver { private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class); + protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); @Override public ObservedColumn getObservedColumn() { @@ -48,14 +51,14 @@ public class ConstructQueryResultObserver extends AbstractObserver { @Override public void process(TransactionBase tx, Bytes row, Column col) throws Exception { - + //Build row for parent that result will be written to BindingSetRow bsRow = BindingSetRow.make(row); String constructNodeId = bsRow.getNodeId(); String bsString= bsRow.getBindingSetString(); - String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString(); + String parentNodeId = queryDao.readMetadadataEntry(tx, constructNodeId, FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString(); String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString; - + //Get NodeType of the parent node NodeType parentType = NodeType.fromNodeId(parentNodeId).get(); //Get data for the ConstructQuery result @@ -63,5 +66,5 @@ public class ConstructQueryResultObserver extends AbstractObserver { //Write result to parent tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes); } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 9514932..78d0ec5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -35,7 +35,8 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporte import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,7 @@ import com.google.common.collect.ImmutableSet; public class QueryResultObserver extends AbstractObserver { private static final Logger log = LoggerFactory.getLogger(QueryResultObserver.class); - private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO(); - + protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); /** * Builders for each type of {@link IncrementalBindingSetExporter} we support. */ @@ -101,7 +101,7 @@ public class QueryResultObserver extends AbstractObserver { // Read the queryId from the row and get the QueryMetadata. final String queryId = row.split(NODEID_BS_DELIM)[0]; - final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId); + final QueryMetadata metadata = queryDao.readQueryMetadata(tx, queryId); // Read the Child Binding Set that will be exported. final Bytes valueBytes = tx.get(brow, col); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index 2d7f390..d6fd8bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -34,7 +34,8 @@ import org.apache.fluo.api.observer.AbstractObserver; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -55,7 +56,7 @@ public class TripleObserver extends AbstractObserver { private static final Logger log = LoggerFactory.getLogger(TripleObserver.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO(); + private static final FluoQueryMetadataCache QUERY_METADATA_DAO = MetadataCacheSupplier.getOrCreateCache(); private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter(); public TripleObserver() {} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java new file mode 100644 index 0000000..8adc40d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + + private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); + private final FluoQueryMetadataDAO dao; + private final Cache<String, CommonNodeMetadata> commonNodeMetadataCache; + private final Cache<String, Bytes> metadataCache; + private int capacity; + private int concurrencyLevel; + + /** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ + public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { + this.dao = dao; + commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); + metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); + this.capacity = capacity; + this.concurrencyLevel = concurrencyLevel; + } + + /** + * @return - capacity of this cache in terms of max number of entries + */ + public int getCapacity() { + return capacity; + } + + /** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ + public int getConcurrencyLevel() { + return concurrencyLevel; + } + + @Override + public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + + try { + checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId); + return dao.readStatementPatternMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.JOIN); + LOG.debug("Retrieving Metadata from Cache: {}.", nodeId); + return (JoinMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readJoinMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access JoinMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public FilterMetadata readFilterMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.FILTER); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (FilterMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readFilterMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access FilterMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public ProjectionMetadata readProjectionMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + checkArgument(type.isPresent() && type.get() == NodeType.PROJECTION); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + try { + return (ProjectionMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readProjectionMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access ProjectionMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public AggregationMetadata readAggregationMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.AGGREGATION); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (AggregationMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readAggregationMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access AggregationMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public ConstructQueryMetadata readConstructQueryMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.CONSTRUCT); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (ConstructQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readConstructQueryMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access ConstructQueryMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public PeriodicQueryMetadata readPeriodicQueryMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.PERIODIC_QUERY); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (PeriodicQueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readPeriodicQueryMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access PeriodicQueryMetadata for nodeId: " + nodeId, e); + } + } + + @Override + public QueryMetadata readQueryMetadata(SnapshotBase tx, String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + checkArgument(type.isPresent() && type.get() == NodeType.QUERY); + LOG.debug("Retrieving Metadata from Cache: {}", nodeId); + return (QueryMetadata) commonNodeMetadataCache.get(nodeId, new Callable<CommonNodeMetadata>() { + @Override + public CommonNodeMetadata call() throws Exception { + LOG.debug("Seeking Metadata from Fluo Table: {}.", nodeId); + return dao.readQueryMetadata(tx, nodeId); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access QueryMetadata for nodeId: " + nodeId, e); + } + } + + public Bytes readMetadadataEntry(SnapshotBase tx, String rowId, Column column) { + Optional<NodeType> type = NodeType.fromNodeId(rowId); + try { + checkArgument(type.isPresent() && type.get().getMetaDataColumns().contains(column)); + return metadataCache.get(getKey(rowId, column), new Callable<Bytes>() { + @Override + public Bytes call() throws Exception { + return tx.get(Bytes.of(rowId), column); + } + }); + } catch (Exception e) { + throw new RuntimeException("Unable to access Metadata Entry with rowId: " + rowId + " and column: " + column, e); + } + } + + private String getKey(String row, Column column) { + return row + ":" + column.getsQualifier() + ":" + column.getsQualifier(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java new file mode 100644 index 0000000..faab952 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java @@ -0,0 +1,44 @@ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCacheSupplier { + + private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class); + private static FluoQueryMetadataCache CACHE; + private static boolean initialized = false; + private static final int DEFAULT_CAPACITY = 10000; + private static final int DEFAULT_CONCURRENCY = 8; + + /** + * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the + * indicated capacity and concurrencyLevel if one is provided. + * + * @param capacity - capacity used to create a new cache + * @param concurrencyLevel - concurrencyLevel used to create a new cache + */ + public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) { + if (!initialized) { + LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity, + concurrencyLevel); + CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel); + initialized = true; + } else { + LOG.debug("Cache has already been initialized. Returning cache with capacity: {} and concurrencylevel: {}", + CACHE.getCapacity(), CACHE.getConcurrencyLevel()); + } + return CACHE; + } + + /** + * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it + * with a default size of 10000 entries and a default concurrency level of 8. + * + * @return - FluoQueryMetadataCache with default instance name and default capacity and concurrency + */ + public static FluoQueryMetadataCache getOrCreateCache() { + return getOrCreateCache(DEFAULT_CAPACITY, DEFAULT_CONCURRENCY); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java new file mode 100644 index 0000000..3df3708 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java @@ -0,0 +1,34 @@ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import org.apache.fluo.api.client.Transaction; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.junit.Test; +import org.mockito.Mockito; + +public class FluoQueryMetadataCacheTest { + + @Test + public void testCache() { + FluoQueryMetadataDAO mockDAO = Mockito.mock(FluoQueryMetadataDAO.class); + Transaction mockTx = Mockito.mock(Transaction.class); + String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + StatementPatternMetadata metadata = StatementPatternMetadata.builder(nodeId).setParentNodeId("parent") + .setStatementPattern("pattern").setVarOrder(new VariableOrder("xyz")).build(); + when(mockDAO.readStatementPatternMetadata(mockTx, nodeId)).thenReturn(metadata); + + FluoQueryMetadataCache cache = new FluoQueryMetadataCache(mockDAO, 20, 2); + + assertEquals(metadata, cache.readStatementPatternMetadata(mockTx, nodeId)); + + cache.readStatementPatternMetadata(mockTx, nodeId); + cache.readStatementPatternMetadata(mockTx, nodeId); + cache.readStatementPatternMetadata(mockTx, nodeId); + cache.readStatementPatternMetadata(mockTx, nodeId); + + Mockito.verify(mockDAO, Mockito.times(1)).readStatementPatternMetadata(mockTx, nodeId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java new file mode 100644 index 0000000..fabf512 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java @@ -0,0 +1,169 @@ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static java.util.Objects.requireNonNull; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; +import org.junit.BeforeClass; +import org.junit.Test; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.repository.sail.SailRepositoryConnection; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class FluoLatencyIT extends KafkaExportITBase { + private static ValueFactory vf; + private static DatatypeFactory dtf; + + @BeforeClass + public static void init() throws DatatypeConfigurationException { + vf = new ValueFactoryImpl(); + dtf = DatatypeFactory.newInstance(); + } + + @Test + public void resultsExported() throws Exception { + + final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { " + + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type"; + +// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { " +// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}"; + + try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + String pcjId = FluoQueryUtils.createNewPcjId(); + FluoConfiguration conf = super.getFluoConfiguration(); + new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient); + SailRepositoryConnection conn = super.getRyaSailRepository().getConnection(); + + long start = System.currentTimeMillis(); + int numReturned = 0; + int numObs = 10; + int numTypes = 5; + int numExpected = 0; + int increment = numObs*numTypes; + while (System.currentTimeMillis() - start < 60000) { + List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now()); + conn.add(statements); + numExpected += increment; + System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: " + + getNumFluoEntries(fluoClient)); + numReturned += readAllResults(pcjId).size(); + System.out + .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned)); +// FluoITHelper.printFluoTable(conf); + Thread.sleep(30000); + } + } + } + + /** + * Generates (numObservationsPerType x numTypes) statements of the form: + * + * <pre> + * urn:obs_n uri:hasTime zonedTime + * urn:obs_n uri:hasObsType typePrefix_m + * </pre> + * + * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by + * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes. + * + * @param numObservationsPerType - The quantity of observations per type to generate. + * @param numTypes - The number of types to generate observations for. + * @param typePrefix - The prefix to be used for the type literal in the statement. + * @param observationOffset - The offset to be used for determining the value of n in the above statements. + * @param zonedTime - The time to be used for all observations generated. + * @return A new list of all generated Statements. + */ + public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix, + final long observationOffset, final ZonedDateTime zonedTime) { + final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT); + final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time)); + final List<Statement> statements = Lists.newArrayList(); + + for (long i = 0; i < numObservationsPerType; i++) { + for (int j = 0; j < numTypes; j++) { + final long observationId = observationOffset + i * numTypes + j; + // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId; + // final String obsId = "urn:obs_" + observationId; + final String obsId = "urn:obs_" + String.format("%020d", observationId); + final String type = typePrefix + j; + // logger.info(obsId + " " + type + " " + litTime); + statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime)); + statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type))); + } + } + + return statements; + } + + private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception { + requireNonNull(pcjId); + + // Read all of the results from the Kafka topic. + final Set<VisibilityBindingSet> results = new HashSet<>(); + + try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + results.add(recordIterator.next().value()); + } + } + + return results; + } + + private int getNumAccEntries(String tableName) throws TableNotFoundException { + Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations()); + int count = 0; + for (Map.Entry<Key, Value> entry : scanner) { + count++; + } + return count; + } + + private int getNumFluoEntries(FluoClient client) { + Transaction tx = client.newTransaction(); + CellScanner scanner = tx.scanner().build(); + int count = 0; + for (RowColumnValue rcv : scanner) { + count++; + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/6d2bfcbc/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index 59fe54f..7b16dcf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -69,7 +69,6 @@ import org.apache.rya.rdftriplestore.RyaSailRepository; import org.apache.rya.sail.config.RyaSailFactory; import org.junit.After; import org.junit.Before; -import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; @@ -129,7 +128,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { final KafkaBindingSetExporterParameters kafkaParams = new KafkaBindingSetExporterParameters(exportParams); kafkaParams.setUseKafkaBindingSetExporter(true); kafkaParams.setKafkaBootStrapServers(BROKERHOST + ":" + BROKERPORT); - + final KafkaSubGraphExporterParameters kafkaConstructParams = new KafkaSubGraphExporterParameters(exportParams); kafkaConstructParams.setUseKafkaSubgraphExporter(true); @@ -262,7 +261,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { * If this test fails then its a testing environment issue, not with Rya. * Source: https://github.com/asmaier/mini-kafka */ - @Test +// @Test public void embeddedKafkaTest() throws Exception { // create topic final String topic = "testTopic"; @@ -339,9 +338,9 @@ public class KafkaExportITBase extends AccumuloExportITBase { // The PCJ Id is the topic name the results will be written to. return pcjId; } - + protected void loadData(final Collection<Statement> statements) throws Exception { - + requireNonNull(statements); final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); @@ -352,7 +351,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { // Wait for the Fluo application to finish computing the end result. super.getMiniFluo().waitForObservers(); - + } }
