http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java new file mode 100644 index 0000000..a772b83 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.demo; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.openrdf.model.Statement; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Snapshot; +import io.fluo.api.data.Bytes; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * Demonstrates historicly added Rya statements that are stored within the core + * Rya tables joining with newly streamed statements into the Fluo application. + */ +public class FluoAndHistoricPcjsDemo implements Demo { + private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class); + + // Employees + private static final RyaURI alice = new RyaURI("http://Alice"); + private static final RyaURI bob = new RyaURI("http://Bob"); + private static final RyaURI charlie = new RyaURI("http://Charlie"); + private static final RyaURI frank = new RyaURI("http://Frank"); + + // Patrons + private static final RyaURI david = new RyaURI("http://David"); + private static final RyaURI eve = new RyaURI("http://Eve"); + private static final RyaURI george = new RyaURI("http://George"); + + // Other People + private static final RyaURI henry = new RyaURI("http://Henry"); + private static final RyaURI irene = new RyaURI("http://Irene"); + private static final RyaURI justin = new RyaURI("http://Justin"); + private static final RyaURI kristi = new RyaURI("http://Kristi"); + private static final RyaURI luke = new RyaURI("http://Luke"); + private static final RyaURI manny = new RyaURI("http://Manny"); + private static final RyaURI nate = new RyaURI("http://Nate"); + private static final RyaURI olivia = new RyaURI("http://Olivia"); + private static final RyaURI paul = new RyaURI("http://Paul"); + private static final RyaURI ross = new RyaURI("http://Ross"); + private static final RyaURI sally = new RyaURI("http://Sally"); + private static final RyaURI tim = new RyaURI("http://Tim"); + + // Places + private static final RyaURI coffeeShop = new RyaURI("http://CoffeeShop"); + private static final RyaURI burgerShop = new RyaURI("http://BurgerShop"); + private static final RyaURI cupcakeShop= new RyaURI("http://cupcakeShop"); + + // Verbs + private static final RyaURI talksTo = new RyaURI("http://talksTo"); + private static final RyaURI worksAt = new RyaURI("http://worksAt"); + + /** + * Used to pause the demo waiting for the presenter to hit the Enter key. + */ + private final java.util.Scanner keyboard = new java.util.Scanner(System.in); + + @Override + public void execute( + final MiniAccumuloCluster accumulo, + final Connector accumuloConn, + final String ryaTablePrefix, + final RyaSailRepository ryaRepo, + final RepositoryConnection ryaConn, + final MiniFluo fluo, + final FluoClient fluoClient) throws DemoExecutionException { + log.setLevel(Level.INFO); + + // 1. Introduce some RDF Statements that we are going to start with and + // pause so the presenter can introduce this information to the audience. + final Set<RyaStatement> relevantHistoricStatements = Sets.newHashSet( + new RyaStatement(eve, talksTo, charlie), + new RyaStatement(david, talksTo, alice), + new RyaStatement(alice, worksAt, coffeeShop), + new RyaStatement(bob, worksAt, coffeeShop)); + + log.info("We add some Statements that are relevant to the query we will compute:"); + prettyLogStatements(relevantHistoricStatements); + waitForEnter(); + + log.info("We also some more Satements that aren't realted to the query we will compute"); + final Set<RyaStatement> otherHistoricStatements = Sets.newHashSet( + new RyaStatement(henry, worksAt, burgerShop), + new RyaStatement(irene, worksAt, burgerShop), + new RyaStatement(justin, worksAt, burgerShop), + new RyaStatement(kristi, worksAt, burgerShop), + new RyaStatement(luke, worksAt, burgerShop), + new RyaStatement(manny, worksAt, cupcakeShop), + new RyaStatement(nate, worksAt, cupcakeShop), + new RyaStatement(olivia, worksAt, cupcakeShop), + new RyaStatement(paul, worksAt, cupcakeShop), + new RyaStatement(ross, worksAt, cupcakeShop), + new RyaStatement(henry, talksTo, irene), + new RyaStatement(henry, talksTo, justin), + new RyaStatement(kristi, talksTo, irene), + new RyaStatement(luke, talksTo, irene), + new RyaStatement(sally, talksTo, paul), + new RyaStatement(sally, talksTo, ross), + new RyaStatement(sally, talksTo, kristi), + new RyaStatement(tim, talksTo, nate), + new RyaStatement(tim, talksTo, paul), + new RyaStatement(tim, talksTo, kristi)); + + log.info("Theese statements will also be inserted into the core Rya tables:"); + prettyLogStatements(otherHistoricStatements); + waitForEnter(); + + // 2. Load the statements into the core Rya tables. + log.info("Loading the historic statements into Rya..."); + loadDataIntoRya(ryaConn, relevantHistoricStatements); + loadDataIntoRya(ryaConn, otherHistoricStatements); + log.info(""); + + // 3. Introduce the query that we're going to load into Fluo and pause so that the + // presenter may show what they believe the expected output should be. + final String sparql = + "SELECT ?patron ?employee " + + "WHERE { " + + "?patron <http://talksTo> ?employee. " + + "?employee <http://worksAt> <http://CoffeeShop>. " + + "}"; + + log.info("The following SPARQL query will be loaded into the Fluo application for incremental updates:"); + prettyLogSparql(sparql); + waitForEnter(); + + // 4. Write the query to Fluo and import the historic matches. Wait for the app to finish exporting results. + log.info("Telling Fluo to maintain the query and import the historic Statement Pattern matches."); + try { + new CreatePcj().withRyaIntegration(fluoClient, ryaTablePrefix, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); + } + + log.info("Waiting for the fluo application to finish exporting the initial results..."); + fluo.waitForObservers(); + log.info("Historic result exporting finished."); + log.info(""); + + // 5. Show that the Fluo app exported the results to the PCJ table in Accumulo. + final String pcjTableName = getPcjTableName(fluoClient, sparql); + + log.info("The following Binding Sets were exported to the '" + pcjTableName+ "' table in Accumulo:"); + Multimap<String, BindingSet> pcjResults = loadPcjResults(accumuloConn, pcjTableName); + prettyLogPcjResults(pcjResults); + waitForEnter(); + + // 6. Introduce some new Statements that we will stream into the Fluo app. + final RyaStatement newLeft = new RyaStatement(george, talksTo, frank); + final RyaStatement newRight = new RyaStatement(frank, worksAt, coffeeShop); + final RyaStatement joinLeft = new RyaStatement(eve, talksTo, bob); + final RyaStatement joinRight = new RyaStatement(charlie, worksAt, coffeeShop); + + final Set<RyaStatement> relevantstreamedStatements = Sets.newHashSet( + newLeft, + newRight, + joinLeft, + joinRight); + + log.info("We stream these relevant Statements into Fluo and the core Rya tables:"); + log.info(prettyFormat(newLeft) + " - Part of a new result"); + log.info(prettyFormat(newRight) + " - Other part of a new result"); + log.info(prettyFormat(joinLeft) + " - Joins with a historic <http://talksTo> statement"); + log.info(prettyFormat(joinRight) + " - Joins with a historic <http://worksA>t statement"); + waitForEnter(); + + final Set<RyaStatement> otherStreamedStatements = Sets.newHashSet( + new RyaStatement(alice, talksTo, tim), + new RyaStatement(bob, talksTo, tim), + new RyaStatement(charlie, talksTo, tim), + new RyaStatement(frank, talksTo, tim), + new RyaStatement(david, talksTo, tim), + new RyaStatement(eve, talksTo, sally), + new RyaStatement(george, talksTo, sally), + new RyaStatement(henry, talksTo, sally), + new RyaStatement(irene, talksTo, sally), + new RyaStatement(justin, talksTo, sally), + new RyaStatement(kristi, talksTo, manny), + new RyaStatement(luke, talksTo, manny), + new RyaStatement(manny, talksTo, paul), + new RyaStatement(nate, talksTo, manny), + new RyaStatement(olivia, talksTo, manny), + new RyaStatement(paul, talksTo, kristi), + new RyaStatement(ross, talksTo, kristi), + new RyaStatement(sally, talksTo, kristi), + new RyaStatement(olivia, talksTo, kristi), + new RyaStatement(olivia, talksTo, kristi)); + + log.info("We also stream these irrelevant Statements into Fluo and the core Rya tables:"); + prettyLogStatements(otherStreamedStatements); + waitForEnter(); + + // 7. Insert the new triples into the core Rya tables and the Fluo app. + loadDataIntoRya(ryaConn, relevantstreamedStatements); + loadDataIntoFluo(fluoClient, relevantstreamedStatements); + + log.info("Waiting for the fluo application to finish exporting the newly streamed results..."); + fluo.waitForObservers(); + log.info("Streamed result exporting finished."); + log.info(""); + + // 8. Show the new results have been exported to the PCJ table in Accumulo. + log.info("The following Binding Sets were expolrted to the '" + pcjTableName+ "' table in Accumulo:"); + pcjResults = loadPcjResults(accumuloConn, pcjTableName); + prettyLogPcjResults(pcjResults); + log.info(""); + } + + private void waitForEnter() { + log.info(""); + log.info("Press [Enter] to continue the demo."); + keyboard.nextLine(); + } + + private static void prettyLogSparql(final String sparql) { + try { + // Pretty print. + final String[] lines = prettyFormatSparql(sparql); + for(final String line : lines) { + log.info(line); + } + } catch (final Exception e) { + // Pretty print failed, so ugly print instead. + log.info(sparql); + } + } + + private static void loadDataIntoFluo(final FluoClient fluoClient, final Set<RyaStatement> statements) { + final InsertTriples insertTriples = new InsertTriples(); + for(final RyaStatement statement : statements) { + insertTriples.insert(fluoClient, statement); + } + } + + private static String prettyFormat(final RyaStatement statement) { + final RyaURI s = statement.getSubject(); + final RyaURI p = statement.getPredicate(); + final RyaType o = statement.getObject(); + return "<" + s.getData() + "> <"+ p.getData() + "> <" + o.getData() + ">"; + } + + private static void prettyLogStatements(final Set<RyaStatement> statements) { + for(final RyaStatement statement : statements) { + log.info(" " + prettyFormat(statement)); + } + } + + private static String[] prettyFormatSparql(final String sparql) throws Exception { + final SPARQLParser parser = new SPARQLParser(); + final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer(); + final ParsedQuery pq = parser.parseQuery(sparql, null); + final String prettySparql = renderer.render(pq); + return StringUtils.split(prettySparql, '\n'); + } + + private static void loadDataIntoRya(final RepositoryConnection ryaConn, final Set<RyaStatement> statements) throws DemoExecutionException { + for(final RyaStatement ryaStatement : statements) { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + try { + ryaConn.add(statement); + } catch (final RepositoryException e) { + throw new DemoExecutionException("Could not load one of the historic statements into Rya, so the demo can not continue. Exiting.", e); + } + } + } + + private static String getPcjTableName(final FluoClient fluoClient, final String sparql) { + try(Snapshot snap = fluoClient.newSnapshot()) { + final Bytes queryId = snap.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); + return snap.get(queryId, FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); + } + } + + /** + * Scan accumulo for the results that are stored in a PCJ tablle. The + * multimap stores a set of deserialized binding sets that were in the PCJ + * table for every variable order that is found in the PCJ metadata. + */ + private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws DemoExecutionException { + final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); + + try { + // Get the variable orders the data was written to. + final PcjTables pcjs = new PcjTables(); + final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + + // Scan Accumulo for the stored results. + for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { + final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); + scanner.fetchColumnFamily( new Text(varOrder.toString()) ); + + for(final Entry<Key, Value> entry : scanner) { + final byte[] serializedResult = entry.getKey().getRow().getBytes(); + final BindingSet result = AccumuloPcjSerializer.deSerialize(serializedResult, varOrder.toArray()); + fetchedResults.put(varOrder.toString(), result); + } + } + } catch(PcjException | TableNotFoundException | RyaTypeResolverException e) { + throw new DemoExecutionException("Couldn't fetch the binding sets that were exported to the PCJ table, so the demo can not continue. Exiting.", e); + } + + return fetchedResults; + } + + private static void prettyLogPcjResults(final Multimap<String, BindingSet> pcjResults) throws DemoExecutionException { + final String varOrderString = pcjResults.keySet().iterator().next(); + final Collection<BindingSet> reuslts = pcjResults.get(varOrderString); + for(final BindingSet result : reuslts) { + log.info(" " + result); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml new file mode 100644 index 0000000..6ef282f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.integration</artifactId> + + <name>Apache Rya PCJ Fluo Integration Tests</name> + <description>Integration tests for the Rya Fluo application.</description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.client</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-mini</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java new file mode 100644 index 0000000..c9af9aa --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.junit.After; +import org.junit.Before; +import org.openrdf.model.Statement; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.google.common.io.Files; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.client.Snapshot; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * Integration tests that ensure the Fluo application processes PCJs results correctly. + * <p> + * This class is being ignored because it doesn't contain any unit tests. + */ +public abstract class ITBase { + private static final Logger log = Logger.getLogger(ITBase.class); + + protected static final String RYA_TABLE_PREFIX = "demo_"; + + // Rya data store and connections. + protected MiniAccumuloCluster accumulo = null; + protected static Connector accumuloConn = null; + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + + // Fluo data store and connections. + protected MiniFluo fluo = null; + protected FluoClient fluoClient = null; + + @Before + public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException { + // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it. + accumulo = startMiniAccumulo(); + + // Setup the Rya library to use the Mini Accumulo. + ryaRepo = setupRya(accumulo); + ryaConn = ryaRepo.getConnection(); + + // Initialize the Mini Fluo that will be used to store created queries. + fluo = startMiniFluo(); + fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() ); + } + + @After + public void shutdownMiniResources() { + if(ryaConn != null) { + try { + log.info("Shutting down Rya Connection."); + ryaConn.close(); + log.info("Rya Connection shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Rya Connection.", e); + } + } + + if(ryaRepo != null) { + try { + log.info("Shutting down Rya Repo."); + ryaRepo.shutDown(); + log.info("Rya Repo shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Rya Repo.", e); + } + } + + if(accumulo != null) { + try { + log.info("Shutting down the Mini Accumulo being used as a Rya store."); + accumulo.stop(); + log.info("Mini Accumulo being used as a Rya store shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } + + if(fluoClient != null) { + try { + log.info("Shutting down Fluo Client."); + fluoClient.close(); + log.info("Fluo Client shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Fluo Client.", e); + } + } + + if(fluo != null) { + try { + log.info("Shutting down Mini Fluo."); + fluo.close(); + log.info("Mini Fluo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo.", e); + } + } + } + + /** + * A helper fuction for creating a {@link BindingSet} from an array of {@link Binding}s. + * + * @param bindings - The bindings to include in the set. (not null) + * @return A {@link BindingSet} holding the bindings. + */ + protected static BindingSet makeBindingSet(final Binding... bindings) { + final MapBindingSet bindingSet = new MapBindingSet(); + for(final Binding binding : bindings) { + bindingSet.addBinding(binding); + } + return bindingSet; + } + + /** + * A helper function for creating a {@link RyaStatement} that represents a Triple. + * + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) + * @return A Triple as a {@link RyaStatement}. + */ + protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) { + checkNotNull(subject); + checkNotNull(predicate); + checkNotNull(object); + + final RyaStatementBuilder builder = RyaStatement.builder() + .setSubject( new RyaURI(subject) ) + .setPredicate( new RyaURI(predicate) ); + + if(object.startsWith("http://")) { + builder.setObject(new RyaURI(object) ); + } else { + builder.setObject( new RyaType(object) ); + } + + return builder.build(); + } + + /** + * A helper function for creating a {@link RyaStatement} that represents a Triple. + * + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) + * @return A Triple as a {@link RyaStatement}. + */ + protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) { + checkNotNull(subject); + checkNotNull(predicate); + + return RyaStatement.builder() + .setSubject(new RyaURI(subject)) + .setPredicate(new RyaURI(predicate)) + .setObject( new RyaType(XMLSchema.INT, "" + object) ) + .build(); + } + + /** + * A helper function for creating a Sesame {@link Statement} that represents a Triple.. + * + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) + * @return A Triple as a {@link Statement}. + */ + protected static Statement makeStatement(final String subject, final String predicate, final String object) { + checkNotNull(subject); + checkNotNull(predicate); + checkNotNull(object); + + final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, object); + return RyaToRdfConversions.convertStatement(ryaStmt); + } + + /** + * Fetches the binding sets that are the results of a specific SPARQL query + * from the Fluo table. + * + * @param fluoClient- A connection to the Fluo table where the results reside. (not null) + * @param sparql - This query's results will be fetched. (not null) + * @return The binding sets for the query's results. + */ + protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) { + final Set<BindingSet> bindingSets = new HashSet<>(); + + try(Snapshot snapshot = fluoClient.newSnapshot()) { + final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString(); + + // Fetch the query's variable order. + final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId); + final String[] varOrder = queryMetadata.getVariableOrder().toArray(); + + // Fetch the Binding Sets for the query. + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier()); + + final RowIterator rowIter = snapshot.get(scanConfig); + while(rowIter.hasNext()) { + final Entry<Bytes, ColumnIterator> row = rowIter.next(); + final String bindingSetString = row.getValue().next().getValue().toString(); + final BindingSet bindingSet = FluoStringConverter.toBindingSet(bindingSetString, varOrder); + bindingSets.add(bindingSet); + } + } + + return bindingSets; + } + + /** + * Setup a Mini Accumulo cluster that uses a temporary directory to store its data. + * + * @return A Mini Accumulo cluster. + */ + private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + final File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password"); + accumulo.start(); + + // Store a connector to the Mini Accumulo. + final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); + accumuloConn = instance.getConnector("root", new PasswordToken("password")); + + return accumulo; + } + + /** + * Format a Mini Accumulo to be a Rya repository. + * + * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null) + * @return The Rya repository sitting on top of the Mini Accumulo. + */ + private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException { + checkNotNull(accumulo); + + // Setup the Rya Repository that will be used to create Repository Connections. + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector(accumuloConn); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("demo_"); + conf.setDisplayQueryPlan(true); + + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); + conf.set(ConfigUtils.CLOUDBASE_USER, "root"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "password"); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName()); + + crdfdao.setConf(conf); + ryaStore.setRyaDAO(crdfdao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + + return ryaRepo; + } + + /** + * Override this method to provide an output configuration to the Fluo application. + * <p> + * Returns an empty map by default. + * + * @return The parameters that will be passed to {@link QueryResultObserver} at startup. + */ + protected Map<String, String> makeExportParams() { + return new HashMap<>(); + } + + /** + * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll + * + * @return A Mini Fluo cluster. + */ + protected MiniFluo startMiniFluo() { + final File miniDataDir = Files.createTempDir(); + + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverConfiguration> observers = new ArrayList<>(); + observers.add(new ObserverConfiguration(TripleObserver.class.getName())); + observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName())); + observers.add(new ObserverConfiguration(JoinObserver.class.getName())); + observers.add(new ObserverConfiguration(FilterObserver.class.getName())); + + // Provide export parameters child test classes may provide to the export observer. + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName()); + exportObserverConfig.setParameters( makeExportParams() ); + observers.add(exportObserverConfig); + + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setApplicationName("IntegrationTests"); + config.setMiniDataDir(miniDataDir.getAbsolutePath()); + config.addObservers(observers); + + final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); + return miniFluo; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java new file mode 100644 index 0000000..41c4f08 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.junit.Test; + +import com.google.common.io.Files; + +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; + +/** + * Tests the methods of {@link CountStatements}. + */ +public class CountStatementsIT extends ITBase { + + /** + * Overriden so that no Observers will be started. This ensures whatever + * statements are inserted as part of the test will not be consumed. + * + * @return A Mini Fluo cluster. + */ + @Override + protected MiniFluo startMiniFluo() { + final File miniDataDir = Files.createTempDir(); + + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverConfiguration> observers = new ArrayList<>(); + + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setApplicationName("IntegrationTests"); + config.setMiniDataDir(miniDataDir.getAbsolutePath()); + config.addObservers(observers); + + final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); + return miniFluo; + } + + @Test + public void test() { + // Insert some Triples into the Fluo app. + final List<RyaStatement> triples = new ArrayList<>(); + triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Alice")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); + triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Bob")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Alice")).build() ); + triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Charlie")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); + triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); + triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); + + new InsertTriples().insert(fluoClient, triples); + + // Load some statements into the Fluo app. + final BigInteger count = new CountStatements().countStatements(fluoClient); + + // Ensure the count matches the expected values. + assertEquals(BigInteger.valueOf(5), count); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java new file mode 100644 index 0000000..c4b00e7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; +import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.sail.SailException; + +import com.google.common.collect.Sets; + +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Integration tests the methods of {@link GetPcjMetadata}. + */ +public class GetPcjMetadataIT extends ITBase { + + @Test + public void getMetadataByQueryId() throws AccumuloException, AccumuloSecurityException, TableExistsException, PcjException, NotInFluoException, NotInAccumuloException, MalformedQueryException, SailException, QueryEvaluationException { + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); + + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, varOrders, sparql); + + // Ensure the command returns the correct metadata. + final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); + + final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); + final PcjMetadata metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient, queryId); + + assertEquals(expected, metadata); + } + + @Test + public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException { + final CreatePcj createPcj = new CreatePcj(); + + // Add a couple of queries to Accumulo. + final String q1Sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + final Set<VariableOrder> q1VarOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); + createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, q1VarOrders, q1Sparql); + + final String q2Sparql = + "SELECT ?x ?y " + + "WHERE { " + + "?x <http://talksTo> ?y. " + + "?y <http://worksAt> <http://Chipotle>." + + "}"; + final Set<VariableOrder> q2VarOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x", "y") ); + createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, q2VarOrders, q2Sparql); + + // Ensure the command returns the correct metadata. + final Set<PcjMetadata> expected = new HashSet<>(); + expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders)); + expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders)); + + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient); + assertEquals(expected, Sets.newHashSet( metadata.values() )); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java new file mode 100644 index 0000000..157412a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static org.junit.Assert.assertEquals; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Integration tests the methods of {@link GetQueryReportl}. + */ +public class GetQueryReportIT extends ITBase { + + @Test + public void getReport() throws Exception { + final String sparql = + "SELECT ?worker ?company ?city" + + "{ " + + "FILTER(?worker = <http://Alice>) " + + "?worker <http://worksAt> ?company . " + + "?worker <http://livesIn> ?city ." + + "}"; + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet( + makeRyaStatement("http://Alice", "http://worksAt", "http://Taco Shop"), + makeRyaStatement("http://Alice", "http://worksAt", "http://Burger Join"), + makeRyaStatement("http://Alice", "http://worksAt", "http://Pastery Shop"), + makeRyaStatement("http://Alice", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://Alice", "http://livesIn", "http://Lost County"), + makeRyaStatement("http://Alice", "http://livesIn", "http://Big City"), + makeRyaStatement("http://Bob", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://Bob", "http://livesIn", "http://Big City"), + makeRyaStatement("http://Charlie", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://Charlie", "http://livesIn", "http://Big City"), + makeRyaStatement("http://David", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://David", "http://livesIn", "http://Lost County"), + makeRyaStatement("http://Eve", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://Eve", "http://livesIn", "http://Big City"), + makeRyaStatement("http://Frank", "http://worksAt", "http://Burrito Place"), + makeRyaStatement("http://Frank", "http://livesIn", "http://Lost County")); + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples); + + // Wait for the results to finish processing. + fluo.waitForObservers(); + + // Fetch the report. + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient); + final Set<String> queryIds = metadata.keySet(); + assertEquals(1, queryIds.size()); + final String queryId = queryIds.iterator().next(); + + final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId); + + // Build the expected counts map. + final Map<String, BigInteger> expectedCounts = new HashMap<>(); + + final FluoQuery fluoQuery = report.getFluoQuery(); + + final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId(); + expectedCounts.put(queryNodeId, BigInteger.valueOf(8)); + + final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId(); + expectedCounts.put(filterNodeId, BigInteger.valueOf(8)); + + final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId(); + expectedCounts.put(joinNodeId, BigInteger.valueOf(13)); + + final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator(); + final StatementPatternMetadata sp1 = patterns.next(); + final StatementPatternMetadata sp2 = patterns.next(); + if(sp1.getStatementPattern().contains("http://worksAt")) { + expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9)); + expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7)); + } else { + expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9)); + expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7)); + } + + assertEquals(expectedCounts, report.getCounts()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java new file mode 100644 index 0000000..d18a497 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_ID; +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.junit.Test; + +import com.beust.jcommander.internal.Lists; + +import io.fluo.api.types.TypedTransaction; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory; + +/** + * Integration tests the methods of {@link ListQueryIds}. + */ +public class ListQueryIdsIT extends ITBase { + + private static final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory(); + + /** + * This test ensures that when there are PCJ tables in Accumulo as well as + * the Fluo table's export destinations column, the command for fetching the + * list of queries only includes queries that appear in both places. + */ + @Test + public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException { + // Store a few SPARQL/Query ID pairs in the Fluo table. + try(TypedTransaction tx = new StringTypeLayer().wrap( fluoClient.newTransaction() )) { + tx.mutate().row("SPARQL_3").col(QUERY_ID).set("ID_3"); + tx.mutate().row("SPARQL_1").col(QUERY_ID).set("ID_1"); + tx.mutate().row("SPARQL_4").col(QUERY_ID).set("ID_4"); + tx.mutate().row("SPARQL_2").col(QUERY_ID).set("ID_2"); + tx.commit(); + } + + // Ensure the correct list of Query IDs is retured. + final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4"); + final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(expected, queryIds); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java new file mode 100644 index 0000000..231c8f1 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.junit.Assert.assertEquals; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import io.fluo.api.client.Snapshot; +import io.fluo.api.client.Transaction; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Integration tests the methods of {@link FluoQueryMetadataDAO}. + */ +public class FluoQueryMetadataDAOIT extends ITBase { + + @Test + public void statementPatternMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final StatementPatternMetadata.Builder builder = StatementPatternMetadata.builder("nodeId"); + builder.setVarOrder(new VariableOrder("a;b;c")); + builder.setStatementPattern("statementPattern"); + builder.setParentNodeId("parentNodeId"); + final StatementPatternMetadata originalMetadata = builder.build(); + + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + StatementPatternMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } + + @Test + public void filterMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final FilterMetadata.Builder builder = FilterMetadata.builder("nodeId"); + builder.setVarOrder(new VariableOrder("e;f")); + builder.setParentNodeId("parentNodeId"); + builder.setChildNodeId("childNodeId"); + builder.setOriginalSparql("originalSparql"); + builder.setFilterIndexWithinSparql(2); + final FilterMetadata originalMetadata = builder.build(); + + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + FilterMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readFilterMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } + + @Test + public void joinMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId"); + builder.setVariableOrder(new VariableOrder("g;y;s")); + builder.setParentNodeId("parentNodeId"); + builder.setLeftChildNodeId("leftChildNodeId"); + builder.setRightChildNodeId("rightChildNodeId"); + final JoinMetadata originalMetadata = builder.build(); + + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + JoinMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readJoinMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } + + @Test + public void queryMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final QueryMetadata.Builder builder = QueryMetadata.builder("nodeId"); + builder.setVariableOrder(new VariableOrder("y;s;d")); + builder.setSparql("sparql string"); + builder.setChildNodeId("childNodeId"); + final QueryMetadata originalMetadata = builder.build(); + + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + QueryMetadata storedMetdata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetdata = dao.readQueryMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetdata); + } + + @Test + public void fluoQueryTest() throws MalformedQueryException { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final String sparql = + "SELECT ?customer ?worker ?city " + + "{ " + + "FILTER(?customer = <http://Alice>) " + + "FILTER(?city = <http://London>) " + + "?customer <http://talksTo> ?worker. " + + "?worker <http://livesIn> ?city. " + + "?worker <http://worksAt> <http://Chipotle>. " + + "}"; + + final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); + final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); + + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalQuery); + tx.commit(); + } + + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalQuery, storedQuery); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java new file mode 100644 index 0000000..41c2d7d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; + +import com.google.common.collect.Sets; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Performs integration tests over the Fluo application geared towards various types of input. + * <p> + * These tests are being ignore so that they will not run as unit tests while building the application. + */ +public class InputIT extends ITBase { + + /** + * Ensure historic matches are included in the result. + */ + @Test + public void historicResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Bob", "http://talksTo", "http://Eve"), + makeStatement("http://Charlie", "http://talksTo", "http://Eve"), + + makeStatement("http://Eve", "http://helps", "http://Kevin"), + + makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeStatement("http://David", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Bob")))); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Charlie")))); + + // Load the historic data into Rya. + for(final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + + final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + + /** + * Ensure streamed matches are included in the result. + */ + @Test + public void streamedResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet( + makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), + makeRyaStatement("http://Bob", "http://talksTo", "http://Eve"), + makeRyaStatement("http://Charlie", "http://talksTo", "http://Eve"), + + makeRyaStatement("http://Eve", "http://helps", "http://Kevin"), + + makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeRyaStatement("http://David", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Bob")))); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Charlie")))); + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Ensure the query has no results yet. + fluo.waitForObservers(); + Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertTrue( results.isEmpty() ); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + + /** + * Simulates the case where a Triple is added to Rya, a new query that includes + * that triple as a historic match is inserted into Fluo, and then some new + * triple that matches the query is streamed into Fluo. The query's results + * must include both the historic result and the newly streamed result. + */ + @Test + public void historicThenStreamedResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Alice", "http://worksAt", "http://Chipotle")); + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet( + makeRyaStatement("http://Frank", "http://talksTo", "http://Eve"), + makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); + + // Load the historic data into Rya. + for(final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Ensure Alice is a match. + fluo.waitForObservers(); + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Alice")))); + + Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples); + + // Verify the end results of the query also include Frank. + fluo.waitForObservers(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Frank")))); + + results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + + /** + * Simulates the case where a Triple is added to Rya, a new query that + * includes the triple as a historic match is inserted into Fluo, and then + * the same triple is streamed into Fluo. The query's results will already + * include the Triple because they were added while the query was being + * created. This case should not fail or effect the end results in any way. + */ + @Test + public void historicAndStreamConflict() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Alice", "http://worksAt", "http://Chipotle")); + + // Triples that will be streamed into Fluo after the PCJ has been created. + final Set<RyaStatement> streamedTriples = Sets.newHashSet( + makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), + makeRyaStatement("http://Alice", "http://worksAt", "http://Chipotle")); + + // The expected final result. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Alice")))); + + // Load the historic data into Rya. + for(final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + + // Ensure Alice is a match. + fluo.waitForObservers(); + Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + + // Stream the same Alice triple into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples); + + // Verify the end results of the query is stiill only Alice. + fluo.waitForObservers(); + results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } +} \ No newline at end of file
