http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 5e12fac..7fa28ab 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -18,58 +18,39 @@ */ package org.apache.rya.indexing.pcj.fluo.integration; +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.Properties; +import java.util.Map; import java.util.Set; +import java.util.UUID; -import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.pcj.fluo.ITBase; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.junit.Test; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.Binding; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.sail.SailRepositoryConnection; -import com.google.common.base.Optional; import com.google.common.collect.Sets; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; - /** * Performs integration tests over the Fluo application geared towards Kafka PCJ exporting. * <p> @@ -78,215 +59,463 @@ import kafka.zk.EmbeddedZookeeper; * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration * $ mvn surefire:test -Dtest=KafkaExportIT */ -public class KafkaExportIT extends ITBase { - private static final Log logger = LogFactory.getLog(KafkaExportIT.class); - - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private static final String TOPIC = "testTopic"; - private ZkUtils zkUtils; - private KafkaServer kafkaServer; - private EmbeddedZookeeper zkServer; - private ZkClient zkClient; - - - /** - * setup mini kafka and call the super to setup mini fluo - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources() - */ - @Override - public void setupMiniResources() throws Exception { - super.setupMiniResources(); - - zkServer = new EmbeddedZookeeper(); - String zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - KafkaConfig config = new KafkaConfig(brokerProps); - Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - - logger.trace("setup kafka and fluo."); - } - - /** - * Test kafka without rya code to make sure kafka works in this environment. - * If this test fails then its a testing environment issue, not with Rya. - * Source: https://github.com/asmaier/mini-kafka - * - * @throws InterruptedException - * @throws IOException - */ - @Test - public void embeddedKafkaTest() throws InterruptedException, IOException { - // create topic - AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - - // setup producer - Properties producerProps = new Properties(); - producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); - producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps); - - // setup consumer - Properties consumerProps = new Properties(); - consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - consumerProps.setProperty("group.id", "group0"); - consumerProps.setProperty("client.id", "consumer0"); - consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic - KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(TOPIC)); - - // send message - ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8)); - producer.send(data); - producer.close(); - - // starting consumer - ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); - assertEquals(1, records.count()); - Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); - ConsumerRecord<Integer, byte[]> record = recordIterator.next(); - logger.trace(String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value())); - assertEquals(42, (int) record.key()); - assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); - consumer.close(); - } +public class KafkaExportIT extends KafkaExportITBase { @Test public void newResultsExportedTest() throws Exception { - final String sparql = "SELECT ?customer ?worker ?city " + "{ " + "FILTER(?customer = <http://Alice>) " + "FILTER(?city = <http://London>) " + "?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + "?worker <http://worksAt> <http://Chipotle>. " + "}"; - + final String sparql = + "SELECT ?customer ?worker ?city { " + + "FILTER(?customer = <http://Alice>) " + + "FILTER(?city = <http://London>) " + + "?customer <http://talksTo> ?worker. " + + "?worker <http://livesIn> ?city. " + + "?worker <http://worksAt> <http://Chipotle>. " + + "}"; + // Triples that will be streamed into Fluo after the PCJ has been created. - final Set<RyaStatement> streamedTriples = Sets.newHashSet(makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), makeRyaStatement("http://Bob", "http://livesIn", "http://London"), makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Alice", "http://talksTo", "http://David"), makeRyaStatement("http://David", "http://livesIn", "http://London"), makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), makeRyaStatement("http://Frank", "http://livesIn", "http://London"), makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); - + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = + Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Bob")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Charlie")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://David")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://livesIn"), vf.createURI("http://Leeds")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London")))); - expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), new BindingImpl("city", new URIImpl("http://London")))); - expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - CreatePcj createPcj = new CreatePcj(); - String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String> absent()); - - // Fetch the exported results from Accumulo once the observers finish working. - fluo.waitForObservers(); - - /// KafkaConsumer<Integer, byte[]> consumer = makeConsumer(QueryIdIsTopicName); - KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(QueryIdIsTopicName); - - // starting consumer polling for messages - /// ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); - ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(3000); - /// Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); - Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); - boolean allExpected = true; - ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null; - while (recordIterator.hasNext()) { - ConsumerRecord<Integer, VisibilityBindingSet> record = recordIterator.next(); - logger.trace(String.format("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString())); - boolean expectedThis = expected.contains(record.value()); - if (!expectedThis) { - logger.trace("This consumed record is not expected."); - unexpectedRecord = record; - } - allExpected = allExpected && expectedThis; - } - assertTrue("Must consume expected record: not expected:" + unexpectedRecord, allExpected); - assertNotEquals("Should get some results", 0, records.count()); - // assertEquals(42, (int) record.key()); - // assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + final Set<BindingSet> expectedResult = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Bob")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResult.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Charlie")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResult.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://David")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResult.add( new VisibilityBindingSet(bs) ); + + // Ensure the last result matches the expected result. + final Set<VisibilityBindingSet> result = readAllResults(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void min() throws Exception { + // A query that finds the minimum price for an item within the inventory. + final String sparql = + "SELECT (min(?price) as ?minPrice) { " + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("minPrice", vf.createLiteral(0.99)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void max() throws Exception { + // A query that finds the maximum price for an item within the inventory. + final String sparql = + "SELECT (max(?price) as ?maxPrice) { " + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("maxPrice", vf.createLiteral(4.99)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void count() throws Exception { + // A query that counts the number of unique items that are in the inventory. + final String sparql = + "SELECT (count(?item) as ?itemCount) { " + + "?item <urn:id> ?id . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + // Three that are part of the count. + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:id"), vf.createLiteral(UUID.randomUUID().toString())), + + // One that is not. + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(3.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("itemCount", vf.createLiteral("3", XMLSchema.INTEGER)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void sum() throws Exception { + // A query that sums the counts of all of the items that are in the inventory. + final String sparql = + "SELECT (sum(?count) as ?itemSum) { " + + "?item <urn:count> ?count . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(5)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:count"), vf.createLiteral(7)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:count"), vf.createLiteral(2))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("itemSum", vf.createLiteral("14", XMLSchema.INTEGER)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void average() throws Exception { + // A query that finds the average price for an item that is in the inventory. + final String sparql = + "SELECT (avg(?price) as ?averagePrice) { " + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(3)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(4)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(8))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("averagePrice", vf.createLiteral("5", XMLSchema.DECIMAL)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); + } + + @Test + public void aggregateWithFilter() throws Exception { + // A query that filters results from a statement pattern before applying the aggregation function. + final String sparql = + "SELECT (min(?price) as ?minPrice) { " + + "FILTER(?price > 1.00) " + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("minPrice", vf.createLiteral(2.50)); + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); } - /** - * A helper function for creating a {@link BindingSet} from an array of - * {@link Binding}s. - * - * @param bindings - * - The bindings to include in the set. (not null) - * @return A {@link BindingSet} holding the bindings. - */ - protected static BindingSet makeBindingSet(final Binding... bindings) { - return new VisibilityBindingSet(ITBase.makeBindingSet(bindings)); + @Test + public void multipleAggregations() throws Exception { + // A query that both counts the number of items being averaged and finds the average price. + final String sparql = + "SELECT (count(?item) as ?itemCount) (avg(?price) as ?averagePrice) {" + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(5.25)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(7)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(2.75))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final MapBindingSet expectedResult = new MapBindingSet(); + expectedResult.addBinding("itemCount", vf.createLiteral("3", XMLSchema.INTEGER)); + expectedResult.addBinding("averagePrice", vf.createLiteral("5.0", XMLSchema.DECIMAL)); + + // Ensure the last result matches the expected result. + final VisibilityBindingSet result = readLastResult(pcjId); + assertEquals(expectedResult, result); } - /** - * @param TopicName - * @return - */ - protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(String TopicName) { - // setup consumer - Properties consumerProps = new Properties(); - consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); - consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); - // "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic - /// KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); - KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(TopicName)); - return consumer; + @Test + public void groupBySingleBinding() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = + "SELECT ?item (avg(?price) as ?averagePrice) {" + + "?item <urn:price> ?price . " + + "} " + + "GROUP BY ?item"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(5.25)), + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(7)), + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.75)), + vf.createStatement(vf.createURI("urn:banana"), vf.createURI("urn:price"), vf.createLiteral(2.75)), + vf.createStatement(vf.createURI("urn:banana"), vf.createURI("urn:price"), vf.createLiteral(1.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("item", vf.createURI("urn:apple")); + bs.addBinding("averagePrice", vf.createLiteral("5.0", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("item", vf.createURI("urn:banana")); + bs.addBinding("averagePrice", vf.createLiteral("2.37", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + // Verify the end results of the query match the expected results. + final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("item")); + assertEquals(expectedResults, results); } - /** - * Add info about the kafka queue/topic to receive the export. - * Call super to get the Rya parameters. - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) - */ - @Override - protected void setExportParameters(HashMap<String, String> exportParams) { - // Get the defaults - super.setExportParameters(exportParams); - // Add the kafka parameters - final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); - kafkaParams.setExportToKafka(true); - // Configure the Producer - Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); - // "org.apache.kafka.common.serialization.StringSerializer"); - kafkaParams.addAllProducerConfig(producerConfig); + @Test + public void groupByManyBindings_avaerages() throws Exception { + // A query that groups what is aggregated by two of the keys. + final String sparql = + "SELECT ?type ?location (avg(?price) as ?averagePrice) {" + + "?id <urn:type> ?type . " + + "?id <urn:location> ?location ." + + "?id <urn:price> ?price ." + + "} " + + "GROUP BY ?type ?location"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + // American items that will be averaged. + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createLiteral("apple")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(.99)), + + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)), + + // French items that will be averaged. + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(8.5)), + + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)), + + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("apple", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("2.5", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("3.12", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cigarettes", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + // Verify the end results of the query match the expected results. + final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location")); + assertEquals(expectedResults, results); } - /** - * Close all the Kafka mini server and mini-zookeeper - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() - */ - @Override - public void shutdownMiniResources() { - super.shutdownMiniResources(); - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); + private String loadData(final String sparql, final Collection<Statement> statements) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); + + // Register the PCJ with Rya. + final Instance accInstance = super.getAccumuloConnector().getInstance(); + final Connector accumuloConn = super.getAccumuloConnector(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + accInstance.getInstanceName(), + accInstance.getZooKeepers()), accumuloConn); + + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + + // Write the data to Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); + ryaConn.begin(); + ryaConn.add(statements); + ryaConn.commit(); + ryaConn.close(); + + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); + + // The PCJ Id is the topic name the results will be written to. + return pcjId; + } + + private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception { + requireNonNull(pcjId); + + // Read all of the results from the Kafka topic. + final Set<VisibilityBindingSet> results = new HashSet<>(); + + try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + results.add( recordIterator.next().value() ); + } + } + + return results; + } + + private VisibilityBindingSet readLastResult(final String pcjId) throws Exception { + requireNonNull(pcjId); + + // Read the results from the Kafka topic. The last one has the final aggregation result. + VisibilityBindingSet result = null; + + try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + result = recordIterator.next().value(); + } + } + + return result; + } + + private Set<VisibilityBindingSet> readGroupedResults(final String pcjId, final VariableOrder groupByVars) { + requireNonNull(pcjId); + + // Read the results from the Kafka topic. The last one for each set of Group By values is an aggregation result. + // The key in this map is a Binding Set containing only the group by variables. + final Map<BindingSet, VisibilityBindingSet> results = new HashMap<>(); + + try(final KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { + final ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(5000); + final Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + final VisibilityBindingSet visBindingSet = recordIterator.next().value(); + + final MapBindingSet key = new MapBindingSet(); + for(final String groupByBar : groupByVars) { + key.addBinding( visBindingSet.getBinding(groupByBar) ); + } + + results.put(key, visBindingSet); + } + } + + return Sets.newHashSet( results.values() ); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 648c5b9..08bf2e1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -18,44 +18,47 @@ */ package org.apache.rya.indexing.pcj.fluo.integration; +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import java.math.BigDecimal; +import java.util.Collection; import java.util.HashSet; import java.util.Set; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.indexing.pcj.fluo.ITBase; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import javax.xml.datatype.DatatypeFactory; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.Literal; +import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; import org.openrdf.model.datatypes.XMLDatatypeUtil; import org.openrdf.model.impl.BooleanLiteralImpl; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.NumericLiteralImpl; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; import org.openrdf.query.algebra.evaluation.function.Function; import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.sail.SailRepositoryConnection; -import com.google.common.base.Optional; import com.google.common.collect.Sets; + /** * Performs integration tests over the Fluo application geared towards various query structures. - * <p> - * These tests are being ignore so that they will not run as unit tests while building the application. */ -public class QueryIT extends ITBase { +public class QueryIT extends RyaExportITBase { @Test public void optionalStatements() throws Exception { @@ -69,40 +72,35 @@ public class QueryIT extends ITBase { "OPTIONAL {?person <http://passedExam> ?exam } . " + "}"; - // Triples that will be streamed into Fluo after the PCJ has been created. - final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://hasDegreeIn", "http://Computer Science"), - makeRyaStatement("http://Alice", "http://passedExam", "http://Certified Ethical Hacker"), - makeRyaStatement("http://Bob", "http://hasDegreeIn", "http://Law"), - makeRyaStatement("http://Bob", "http://passedExam", "http://MBE"), - makeRyaStatement("http://Bob", "http://passedExam", "http://BAR-Kansas"), - makeRyaStatement("http://Charlie", "http://hasDegreeIn", "http://Law")); - - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add( makeBindingSet( - new BindingImpl("person", new URIImpl("http://Bob")), - new BindingImpl("exam", new URIImpl("http://MBE")))); - expected.add( makeBindingSet( - new BindingImpl("person", new URIImpl("http://Bob")), - new BindingImpl("exam", new URIImpl("http://BAR-Kansas")))); - expected.add( makeBindingSet( - new BindingImpl("person", new URIImpl("http://Charlie")))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://hasDegreeIn"), vf.createURI("http://Computer Science")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://passedExam"), vf.createURI("http://Certified Ethical Hacker")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://hasDegreeIn"), vf.createURI("http://Law")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://passedExam"), vf.createURI("http://MBE")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://passedExam"), vf.createURI("http://BAR-Kansas")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://hasDegreeIn"), vf.createURI("http://Law"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("http://Bob")); + bs.addBinding("exam", vf.createURI("http://MBE")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("http://Bob")); + bs.addBinding("exam", vf.createURI("http://BAR-Kansas")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("http://Charlie")); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); } /** @@ -124,71 +122,66 @@ public class QueryIT extends ITBase { "?candidate <http://talksTo> ?leader. " + "}"; - // Triples that will be streamed into Fluo after the PCJ has been created. - final Set<RyaStatement> streamedTriples = Sets.newHashSet( + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( // Leaders - makeRyaStatement("http://Alice", "http://leaderOf", "http://GeekSquad"), - makeRyaStatement("http://Bob", "http://leaderOf", "http://GeekSquad"), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://leaderOf"), vf.createURI("http://GeekSquad")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://leaderOf"), vf.createURI("http://GeekSquad")), // Recruiters - makeRyaStatement("http://Charlie", "http://recruiterFor", "http://GeekSquad"), - makeRyaStatement("http://David", "http://recruiterFor", "http://GeekSquad"), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://recruiterFor"), vf.createURI("http://GeekSquad")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://recruiterFor"), vf.createURI("http://GeekSquad")), // Candidates - makeRyaStatement("http://Eve", "http://skilledWith", "http://Computers"), - makeRyaStatement("http://Eve", "http://livesIn", "USA"), - makeRyaStatement("http://Frank", "http://skilledWith", "http://Computers"), - makeRyaStatement("http://Frank", "http://livesIn", "USA"), - makeRyaStatement("http://George", "http://skilledWith", "http://Computers"), - makeRyaStatement("http://George", "http://livesIn", "Germany"), - makeRyaStatement("http://Harry", "http://skilledWith", "http://Negotiating"), - makeRyaStatement("http://Harry", "http://livesIn", "USA"), - makeRyaStatement("http://Ivan", "http://skilledWith", "http://Computers"), - makeRyaStatement("http://Ivan", "http://livesIn", "USA"), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("http://George"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), + vf.createStatement(vf.createURI("http://George"), vf.createURI("http://livesIn"), vf.createLiteral("Germany")), + vf.createStatement(vf.createURI("http://Harry"), vf.createURI("http://skilledWith"), vf.createURI("http://Negotiating")), + vf.createStatement(vf.createURI("http://Harry"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("http://Ivan"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), + vf.createStatement(vf.createURI("http://Ivan"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), // Candidates the recruiters talk to. - makeRyaStatement("http://Charlie", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Charlie", "http://talksTo", "http://George"), - makeRyaStatement("http://Charlie", "http://talksTo", "http://Harry"), - makeRyaStatement("http://David", "http://talksTo", "http://Eve"), - makeRyaStatement("http://David", "http://talksTo", "http://Frank"), - makeRyaStatement("http://David", "http://talksTo", "http://Ivan"), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://George")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Harry")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Frank")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Ivan")), // Recruits that talk to leaders. - makeRyaStatement("http://Eve", "http://talksTo", "http://Alice"), - makeRyaStatement("http://George", "http://talksTo", "http://Alice"), - makeRyaStatement("http://Harry", "http://talksTo", "http://Bob"), - makeRyaStatement("http://Ivan", "http://talksTo", "http://Bob")); - - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add( makeBindingSet( - new BindingImpl("recruiter", new URIImpl("http://Charlie")), - new BindingImpl("candidate", new URIImpl("http://Eve")), - new BindingImpl("leader", new URIImpl("http://Alice")))); - expected.add( makeBindingSet( - new BindingImpl("recruiter", new URIImpl("http://David")), - new BindingImpl("candidate", new URIImpl("http://Eve")), - new BindingImpl("leader", new URIImpl("http://Alice")))); - expected.add( makeBindingSet( - new BindingImpl("recruiter", new URIImpl("http://David")), - new BindingImpl("candidate", new URIImpl("http://Ivan")), - new BindingImpl("leader", new URIImpl("http://Bob")))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://George"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Harry"), vf.createURI("http://talksTo"), vf.createURI("http://Bob")), + vf.createStatement(vf.createURI("http://Ivan"), vf.createURI("http://talksTo"), vf.createURI("http://Bob"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("recruiter", vf.createURI("http://Charlie")); + bs.addBinding("candidate", vf.createURI("http://Eve")); + bs.addBinding("leader", vf.createURI("http://Alice")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("recruiter", vf.createURI("http://David")); + bs.addBinding("candidate", vf.createURI("http://Eve")); + bs.addBinding("leader", vf.createURI("http://Alice")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("recruiter", vf.createURI("http://David")); + bs.addBinding("candidate", vf.createURI("http://Ivan")); + bs.addBinding("leader", vf.createURI("http://Bob")); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); } @Test @@ -203,57 +196,52 @@ public class QueryIT extends ITBase { "?worker <http://worksAt> <http://Chipotle>. " + "}"; - // Triples that will be streamed into Fluo after the PCJ has been created. - final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), - makeRyaStatement("http://Bob", "http://livesIn", "http://London"), - makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), - - makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), - makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), - makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - - makeRyaStatement("http://Alice", "http://talksTo", "http://David"), - makeRyaStatement("http://David", "http://livesIn", "http://London"), - makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), - - makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), - makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), - - makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), - makeRyaStatement("http://Frank", "http://livesIn", "http://London"), - makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); - - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London")))); - expected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London")))); - expected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://David")), - new BindingImpl("city", new URIImpl("http://London")))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Bob")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Charlie")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://David")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://livesIn"), vf.createURI("http://Leeds")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://livesIn"), vf.createURI("http://London")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Bob")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Charlie")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://David")); + bs.addBinding("city", vf.createURI("http://London")); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); } @Test @@ -266,69 +254,51 @@ public class QueryIT extends ITBase { "?name <http://playsSport> \"Soccer\" " + "}"; - final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://hasAge", 18), - makeRyaStatement("http://Bob", "http://hasAge", 30), - makeRyaStatement("http://Charlie", "http://hasAge", 14), - makeRyaStatement("http://David", "http://hasAge", 16), - makeRyaStatement("http://Eve", "http://hasAge", 35), - - makeRyaStatement("http://Alice", "http://playsSport", "Soccer"), - makeRyaStatement("http://Bob", "http://playsSport", "Soccer"), - makeRyaStatement("http://Charlie", "http://playsSport", "Basketball"), - makeRyaStatement("http://Charlie", "http://playsSport", "Soccer"), - makeRyaStatement("http://David", "http://playsSport", "Basketball")); - - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add( makeBindingSet( - new BindingImpl("name", new URIImpl("http://Alice")), - new BindingImpl("age", new NumericLiteralImpl(18, XMLSchema.INTEGER)))); - expected.add( makeBindingSet( - new BindingImpl("name", new URIImpl("http://Charlie")), - new BindingImpl("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://hasAge"), vf.createLiteral(18)), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://hasAge"), vf.createLiteral(30)), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://hasAge"), vf.createLiteral(14)), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://hasAge"), vf.createLiteral(16)), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://hasAge"), vf.createLiteral(35)), + + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("http://Alice")); + bs.addBinding("age", vf.createLiteral("18", XMLSchema.INTEGER)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("http://Charlie")); + bs.addBinding("age", vf.createLiteral("14", XMLSchema.INTEGER)); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); } - + @Test public void withCustomFilters() throws Exception { - final String sparql = "prefix ryafunc: <tag:rya.apache.org,2017:function#> \n" // - + "SELECT ?name ?age \n" // - + "{ \n" // - + "FILTER( ryafunc:isTeen(?age) ) . \n" // - + "?name <http://hasAge> ?age . \n" // - + "?name <http://playsSport> \"Soccer\" \n" // - + "}"; // - - final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://hasAge", 18), - makeRyaStatement("http://Bob", "http://hasAge", 30), - makeRyaStatement("http://Charlie", "http://hasAge", 14), - makeRyaStatement("http://David", "http://hasAge", 16), - makeRyaStatement("http://Eve", "http://hasAge", 35), - - makeRyaStatement("http://Alice", "http://playsSport", "Soccer"), - makeRyaStatement("http://Bob", "http://playsSport", "Soccer"), - makeRyaStatement("http://Charlie", "http://playsSport", "Basketball"), - makeRyaStatement("http://Charlie", "http://playsSport", "Soccer"), - makeRyaStatement("http://David", "http://playsSport", "Basketball")); - - Function fooFunction = new Function() { + final String sparql = + "prefix ryafunc: <tag:rya.apache.org,2017:function#> " + + "SELECT ?name ?age " + + "{ " + + "FILTER( ryafunc:isTeen(?age) ) . " + + "?name <http://hasAge> ?age . " + + "?name <http://playsSport> \"Soccer\" . " + + "}"; + // Register a custom Filter. + final Function fooFunction = new Function() { @Override public String getURI() { return "tag:rya.apache.org,2017:function#isTeen"; @@ -337,24 +307,22 @@ public class QueryIT extends ITBase { final static int TEEN_THRESHOLD = 20; @Override - public Value evaluate(ValueFactory valueFactory, Value... args) throws ValueExprEvaluationException { - + public Value evaluate(final ValueFactory valueFactory, final Value... args) throws ValueExprEvaluationException { if (args.length != 1) { throw new ValueExprEvaluationException("isTeen() requires exactly 1 argument, got " + args.length); } if (args[0] instanceof Literal) { - Literal literal = (Literal) args[0]; - - URI datatype = literal.getDatatype(); + final Literal literal = (Literal) args[0]; + final URI datatype = literal.getDatatype(); // ABS function accepts only numeric literals if (datatype != null && XMLDatatypeUtil.isNumericDatatype(datatype)) { if (XMLDatatypeUtil.isDecimalDatatype(datatype)) { - BigDecimal bigValue = literal.decimalValue(); + final BigDecimal bigValue = literal.decimalValue(); return BooleanLiteralImpl.valueOf(bigValue.compareTo(new BigDecimal(TEEN_THRESHOLD)) < 0); } else if (XMLDatatypeUtil.isFloatingPointDatatype(datatype)) { - double doubleValue = literal.doubleValue(); + final double doubleValue = literal.doubleValue(); return BooleanLiteralImpl.valueOf(doubleValue < TEEN_THRESHOLD); } else { throw new ValueExprEvaluationException("unexpected datatype (expect decimal/int or floating) for function operand: " + args[0]); @@ -371,82 +339,134 @@ public class QueryIT extends ITBase { // Add our new function to the registry FunctionRegistry.getInstance().add(fooFunction); - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add( makeBindingSet( - new BindingImpl("name", new URIImpl("http://Alice")), - new BindingImpl("age", new NumericLiteralImpl(18, XMLSchema.INTEGER)))); - expected.add( makeBindingSet( - new BindingImpl("name", new URIImpl("http://Charlie")), - new BindingImpl("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://hasAge"), vf.createLiteral(18)), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://hasAge"), vf.createLiteral(30)), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://hasAge"), vf.createLiteral(14)), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://hasAge"), vf.createLiteral(16)), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://hasAge"), vf.createLiteral(35)), + + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("http://Alice")); + bs.addBinding("age", vf.createLiteral("18", XMLSchema.INTEGER)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("http://Charlie")); + bs.addBinding("age", vf.createLiteral("14", XMLSchema.INTEGER)); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); } @Test public void withTemporal() throws Exception { + // A query that finds all stored data after 3 seconds. final String dtPredUri = "http://www.w3.org/2006/time#inXSDDateTime"; final String dtPred = "<" + dtPredUri + ">"; - final String xmlDateTime = "http://www.w3.org/2001/XMLSchema#dateTime"; - // Find all stored dates. - String selectQuery = "PREFIX time: <http://www.w3.org/2006/time#> \n"// - + "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> \n" // - + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n"// - + "SELECT ?event ?time \n" // - + "WHERE { \n" // - + " ?event " + dtPred + " ?time . \n"// - // + " FILTER(?time > '2000-01-01T01:00:00Z'^^xml:dateTime) \n"// all - // + " FILTER(?time < '2007-01-01T01:01:03-08:00'^^xml:dateTime) \n"// after 2007 - + " FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) \n"// after 3 seconds - + "}";// - - // create some resources and literals to make statements out of - String eventz = "<http://eventz>"; - final Set<RyaStatement> streamedTriples = Sets.newHashSet(// - makeRyaStatement(eventz, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", "<http://www.w3.org/2006/time#Instant>"), // - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2001-01-01T01:01:01-08:00")), // one second - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2001-01-01T04:01:02.000-05:00")), // 2 seconds - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2001-01-01T01:01:03-08:00")), // 3 seconds - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2001-01-01T01:01:04-08:00")), // 4seconds - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2001-01-01T09:01:05Z")), // 5 seconds - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2006-01-01")), // - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2007-01-01")), // - makeRyaStatement(eventz, dtPredUri, new RyaType(new URIImpl(xmlDateTime), "2008-01-01"))); - - // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("event", new URIImpl(eventz)), new BindingImpl("time", new LiteralImpl("2001-01-01T09:01:04.000Z", new URIImpl(xmlDateTime))))); // - expected.add(makeBindingSet(new BindingImpl("event", new URIImpl(eventz)), new BindingImpl("time", new LiteralImpl("2001-01-01T09:01:05.000Z", new URIImpl(xmlDateTime))))); // - expected.add(makeBindingSet(new BindingImpl("event", new URIImpl(eventz)), new BindingImpl("time", new LiteralImpl("2006-01-01T05:00:00.000Z", new URIImpl(xmlDateTime))))); // - expected.add(makeBindingSet(new BindingImpl("event", new URIImpl(eventz)), new BindingImpl("time", new LiteralImpl("2007-01-01T05:00:00.000Z", new URIImpl(xmlDateTime))))); // - expected.add(makeBindingSet(new BindingImpl("event", new URIImpl(eventz)), new BindingImpl("time", new LiteralImpl("2008-01-01T05:00:00.000Z", new URIImpl(xmlDateTime))))); - - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(selectQuery); - - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String> absent()); + + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time#> " + + "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> " + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " + + "SELECT ?event ?time " + + "WHERE { " + + "?event " + dtPred + " ?time . " + + "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://eventz"), vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createURI("http://www.w3.org/2006/time#Instant")), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 1 second + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))), // 2 second + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 3 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 4 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z")))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("event", vf.createURI("http://eventz")); + bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:04.000Z"))); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("event", vf.createURI("http://eventz")); + bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05.000Z"))); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("event", vf.createURI("http://eventz")); + bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("event", vf.createURI("http://eventz")); + bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("event", vf.createURI("http://eventz")); + bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z"))); + expectedResults.add(bs); // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, selectQuery); - assertEquals(expected, results); + runTest(sparql, statements, expectedResults); + } + + public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); + requireNonNull(expectedResults); + + // Register the PCJ with Rya. + final Instance accInstance = super.getAccumuloConnector().getInstance(); + final Connector accumuloConn = super.getAccumuloConnector(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + accInstance.getInstanceName(), + accInstance.getZooKeepers()), accumuloConn); + + ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + + // Write the data to Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); + ryaConn.begin(); + ryaConn.add(statements); + ryaConn.commit(); + ryaConn.close(); + + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); + + // Fetch the value that is stored within the PCJ table. + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) { + final String pcjId = pcjStorage.listPcjs().get(0); + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + + // Ensure the result of the query matches the expected result. + assertEquals(expectedResults, results); + } } -} +} \ No newline at end of file
