Repository: incubator-rya Updated Branches: refs/heads/master 646d21b4e -> 60090ad52
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java index 3ed1844..452dd27 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java @@ -18,18 +18,22 @@ */ package org.apache.rya.indexing.pcj.fluo; +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Properties; import org.I0Itec.zkclient.ZkClient; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.recipes.test.AccumuloExportITBase; @@ -40,7 +44,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.client.Install.InstallConfiguration; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; @@ -48,7 +54,9 @@ import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; @@ -60,8 +68,11 @@ import org.apache.rya.sail.config.RyaSailFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; + import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; @@ -74,7 +85,8 @@ import kafka.utils.ZkUtils; import kafka.zk.EmbeddedZookeeper; /** - * The base Integration Test class used for Fluo applications that export to a Kakfa topic. + * The base Integration Test class used for Fluo applications that export to a + * Kakfa topic. */ public class KafkaExportITBase extends AccumuloExportITBase { @@ -88,8 +100,10 @@ public class KafkaExportITBase extends AccumuloExportITBase { private EmbeddedZookeeper zkServer; private ZkClient zkClient; - // The Rya instance statements are written to that will be fed into the Fluo app. + // The Rya instance statements are written to that will be fed into the Fluo + // app. private RyaSailRepository ryaSailRepo = null; + private AccumuloRyaDAO dao = null; /** * Add info about the Kafka queue/topic to receive the export. @@ -104,7 +118,8 @@ public class KafkaExportITBase extends AccumuloExportITBase { observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); - // Configure the export observer to export new PCJ results to the mini accumulo cluster. + // Configure the export observer to export new PCJ results to the mini + // accumulo cluster. final HashMap<String, String> exportParams = new HashMap<>(); final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); @@ -114,11 +129,29 @@ public class KafkaExportITBase extends AccumuloExportITBase { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); kafkaParams.addAllProducerConfig(producerConfig); final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); observers.add(exportObserverConfig); + + //create construct query observer and tell it not to export to Kafka + //it will only add results back into Fluo + HashMap<String, String> constructParams = new HashMap<>(); + final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams); + kafkaConstructParams.setExportToKafka(true); + + // Configure the Kafka Producer + final Properties constructProducerConfig = new Properties(); + constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); + kafkaConstructParams.addAllProducerConfig(constructProducerConfig); + + final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), + constructParams); + observers.add(constructExportObserverConfig); // Add the observers to the Fluo Configuration. super.getFluoConfiguration().addObservers(observers); @@ -150,24 +183,24 @@ public class KafkaExportITBase extends AccumuloExportITBase { } @After - public void teardownRya() throws Exception { + public void teardownRya() { final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); final String instanceName = cluster.getInstanceName(); final String zookeepers = cluster.getZooKeepers(); // Uninstall the instance of Rya. final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), + new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), super.getAccumuloConnector()); - ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); - - // Shutdown the repo. - ryaSailRepo.shutDown(); + try { + ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); + // Shutdown the repo. + if(ryaSailRepo != null) {ryaSailRepo.shutDown();} + if(dao != null ) {dao.destroy();} + } catch (Exception e) { + System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage()); + } } private void installRyaInstance() throws Exception { @@ -177,26 +210,18 @@ public class KafkaExportITBase extends AccumuloExportITBase { // Install the Rya instance to the mini accumulo cluster. final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), + new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), super.getAccumuloConnector()); - ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() - .setEnableTableHashPrefix(false) - .setEnableFreeTextIndex(false) - .setEnableEntityCentricIndex(false) - .setEnableGeoIndex(false) - .setEnableTemporalIndex(false) - .setEnablePcjIndex(true) - .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() ) - .build()); + ryaClient.getInstall().install(RYA_INSTANCE_NAME, + InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false) + .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true) + .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build()); // Connect to the Rya instance that was just installed. final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); final Sail sail = RyaSailFactory.getInstance(conf); + dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf); ryaSailRepo = new RyaSailRepository(sail); } @@ -211,15 +236,12 @@ public class KafkaExportITBase extends AccumuloExportITBase { conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); conf.setAuths(""); - // PCJ configuration information. conf.set(ConfigUtils.USE_PCJ, "true"); conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); conf.setDisplayQueryPlan(true); @@ -227,20 +249,29 @@ public class KafkaExportITBase extends AccumuloExportITBase { } /** - * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into. + * @return A {@link RyaSailRepository} that is connected to the Rya instance + * that statements are loaded into. */ protected RyaSailRepository getRyaSailRepository() throws Exception { return ryaSailRepo; } /** + * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct + * visibilities can be added to the Rya Instance + */ + protected AccumuloRyaDAO getRyaDAO() { + return dao; + } + + /** * Close all the Kafka mini server and mini-zookeeper */ @After public void teardownKafka() { - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); + if(kafkaServer != null) {kafkaServer.shutdown();} + if(zkClient != null) {zkClient.close();} + if(zkServer != null) {zkServer.shutdown();} } /** @@ -257,7 +288,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { // setup producer final Properties producerProps = new Properties(); producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps); @@ -266,7 +297,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); consumerProps.setProperty("group.id", "group0"); consumerProps.setProperty("client.id", "consumer0"); - consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); // to make sure the consumer starts from the beginning of the topic @@ -296,8 +327,10 @@ public class KafkaExportITBase extends AccumuloExportITBase { consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); // to make sure the consumer starts from the beginning of the topic consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -306,4 +339,32 @@ public class KafkaExportITBase extends AccumuloExportITBase { consumer.subscribe(Arrays.asList(TopicName)); return consumer; } + + protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); + + // Register the PCJ with Rya. + final Instance accInstance = super.getAccumuloConnector().getInstance(); + final Connector accumuloConn = super.getAccumuloConnector(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); + + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + + // Write the data to Rya. + final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); + ryaConn.begin(); + ryaConn.add(statements); + ryaConn.commit(); + ryaConn.close(); + + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); + + // The PCJ Id is the topic name the results will be written to. + return pcjId; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java index 84b6343..4eab0f6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java @@ -26,8 +26,10 @@ import org.apache.fluo.api.config.ObserverSpecification; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; @@ -68,6 +70,13 @@ public class RyaExportITBase extends FluoITBase { final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); observers.add(exportObserverConfig); + + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(false); + + final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), + exportParams); + observers.add(constructExportObserverConfig); // Add the observers to the Fluo Configuration. super.getFluoConfiguration().addObservers(observers); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index d56c23a..d19646e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase { final FluoQuery fluoQuery = report.getFluoQuery(); - final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId(); + final String queryNodeId = fluoQuery.getQueryMetadata().get().getNodeId(); expectedCounts.put(queryNodeId, BigInteger.valueOf(8)); final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index 082f46d..accabbf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -20,22 +20,30 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; +import java.util.List; + import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.junit.Test; import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; import org.openrdf.repository.RepositoryException; +import com.google.common.base.Optional; + /** * Integration tests the methods of {@link FluoQueryMetadataDAO}. */ @@ -160,6 +168,42 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { assertEquals(originalMetadata, storedMetdata); } } + + @Test + public void constructQueryMetadataTest() throws MalformedQueryException { + + String query = "select ?x ?y where {?x <uri:p1> ?y. ?y <uri:p2> <uri:o1> }"; + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final ConstructQueryMetadata.Builder builder = ConstructQueryMetadata.builder(); + builder.setNodeId("nodeId"); + builder.setSparql(query); + builder.setChildNodeId("childNodeId"); + builder.setConstructGraph(new ConstructGraph(patterns)); + final ConstructQueryMetadata originalMetadata = builder.build(); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + ConstructQueryMetadata storedMetdata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetdata = dao.readConstructQueryMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetdata); + } + } @Test public void aggregationMetadataTest_withGroupByVarOrders() { @@ -242,6 +286,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); + assertEquals(QueryType.Projection, originalQuery.getQueryType()); + assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent()); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Write it to the Fluo table. try(Transaction tx = fluoClient.newTransaction()) { @@ -249,12 +296,51 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { tx.commit(); } - // Read it from the Fluo table. - FluoQuery storedQuery = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().get().getNodeId()); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalQuery, storedQuery); + } + } + + @Test + public void fluoConstructQueryTest() throws MalformedQueryException { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final String sparql = + "CONSTRUCT { ?customer <http://travelsTo> <http://England> . ?customer <http://friendsWith> ?worker }" + + "WHERE { " + + "FILTER(?customer = <http://Alice>) " + + "FILTER(?city = <http://London>) " + + "?customer <http://talksTo> ?worker. " + + "?worker <http://livesIn> ?city. " + + "?worker <http://worksAt> <http://Chipotle>. " + + "}"; + + final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); + final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); + + assertEquals(QueryType.Construct, originalQuery.getQueryType()); + assertEquals(false, originalQuery.getQueryMetadata().isPresent()); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalQuery); + tx.commit(); } + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getConstructQueryMetadata().get().getNodeId()); + } + // Ensure the deserialized object is the same as the serialized one. assertEquals(originalQuery, storedQuery); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 349d391..414fa70 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -121,6 +121,7 @@ public class CreateDeleteIT extends RyaExportITBase { assertEquals(0, empty_rows.size()); } } + private String loadData(final String sparql, final Collection<Statement> statements) throws Exception { requireNonNull(sparql); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 7fa28ab..219e079 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -29,14 +29,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; @@ -47,7 +42,6 @@ import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; -import org.openrdf.repository.sail.SailRepositoryConnection; import com.google.common.collect.Sets; @@ -427,38 +421,10 @@ public class KafkaExportIT extends KafkaExportITBase { // Verify the end results of the query match the expected results. final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location")); + System.out.println(results); assertEquals(expectedResults, results); } - private String loadData(final String sparql, final Collection<Statement> statements) throws Exception { - requireNonNull(sparql); - requireNonNull(statements); - - // Register the PCJ with Rya. - final Instance accInstance = super.getAccumuloConnector().getInstance(); - final Connector accumuloConn = super.getAccumuloConnector(); - - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - accInstance.getInstanceName(), - accInstance.getZooKeepers()), accumuloConn); - - final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); - - // Write the data to Rya. - final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); - ryaConn.begin(); - ryaConn.add(statements); - ryaConn.commit(); - ryaConn.close(); - - // Wait for the Fluo application to finish computing the end result. - super.getMiniFluo().waitForObservers(); - - // The PCJ Id is the topic name the results will be written to. - return pcjId; - } private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception { requireNonNull(pcjId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java new file mode 100644 index 0000000..c8167c7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java @@ -0,0 +1,352 @@ +package org.apache.rya.indexing.pcj.fluo.integration; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils; +import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.google.common.collect.Sets; + +public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { + + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + + /** + * Add info about the Kafka queue/topic to receive the export. + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) + */ + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini + // accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(true); + + // Configure the Kafka Producer + final Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); + kafkaParams.addAllProducerConfig(producerConfig); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), + exportParams); + observers.add(exportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + + @Test + public void basicConstructQuery() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { " + + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:Joe"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:livesIn"), vf.createURI("urn:London")), + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:burgerShack"))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadStatements(sparql, statements); + + // Verify the end results of the query match the expected results. + final Set<RyaSubGraph> results = readAllResults(pcjId); + + final Set<RyaSubGraph> expectedResults = new HashSet<>(); + RyaSubGraph subGraph = new RyaSubGraph(pcjId); + RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London")); + RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob")); + // if no visibility indicated, then visibilities set to empty byte in + // Fluo - they are null by default in RyaStatement + // need to set visibility to empty byte so that RyaStatement's equals + // will return true + statement1.setColumnVisibility(new byte[0]); + statement2.setColumnVisibility(new byte[0]); + + Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement1, statement2)); + subGraph.setStatements(stmnts); + expectedResults.add(subGraph); + + ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results); + } + + @Test + public void basicConstructQueryWithVis() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { " + + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}"; + + // Create the Statements that will be loaded into Rya. + RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob")); + RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London")); + RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack")); + statement1.setColumnVisibility("U&W".getBytes("UTF-8")); + statement2.setColumnVisibility("V".getBytes("UTF-8")); + statement3.setColumnVisibility("W".getBytes("UTF-8")); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3)); + + // Verify the end results of the query match the expected results. + final Set<RyaSubGraph> results = readAllResults(pcjId); + // Create the expected results of the SPARQL query once the PCJ has been + // computed. + final Set<RyaSubGraph> expectedResults = new HashSet<>(); + RyaSubGraph subGraph = new RyaSubGraph(pcjId); + RyaStatement statement4 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London")); + RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob")); + // if no visibility indicated, then visibilities set to empty byte in + // Fluo - they are null by default in RyaStatement + // need to set visibility to empty byte so that RyaStatement's equals + // will return true + statement4.setColumnVisibility("U&V&W".getBytes("UTF-8")); + statement5.setColumnVisibility("U&V&W".getBytes("UTF-8")); + + Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement4, statement5)); + subGraph.setStatements(stmnts); + expectedResults.add(subGraph); + + ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results); + } + + + @Test + public void constructQueryWithVisAndMultipleSubGraphs() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . ?customer <urn:friendsWith> ?worker }" + "WHERE { " + + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}"; + + // Create the Statements that will be loaded into Rya. + RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob")); + RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London")); + RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack")); + RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:talksTo"), new RyaURI("urn:Evan")); + RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco")); + RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack")); + statement1.setColumnVisibility("U&W".getBytes("UTF-8")); + statement2.setColumnVisibility("V".getBytes("UTF-8")); + statement3.setColumnVisibility("W".getBytes("UTF-8")); + statement4.setColumnVisibility("A&B".getBytes("UTF-8")); + statement5.setColumnVisibility("B".getBytes("UTF-8")); + statement6.setColumnVisibility("C".getBytes("UTF-8")); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3, statement4, statement5, statement6)); + + // Verify the end results of the query match the expected results. + final Set<RyaSubGraph> results = readAllResults(pcjId); + // Create the expected results of the SPARQL query once the PCJ has been + // computed. + RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London")); + RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob")); + RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco")); + RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan")); + statement7.setColumnVisibility("U&V&W".getBytes("UTF-8")); + statement8.setColumnVisibility("U&V&W".getBytes("UTF-8")); + statement9.setColumnVisibility("A&B&C".getBytes("UTF-8")); + statement10.setColumnVisibility("A&B&C".getBytes("UTF-8")); + + final Set<RyaSubGraph> expectedResults = new HashSet<>(); + + RyaSubGraph subGraph1 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, statement8)); + subGraph1.setStatements(stmnts1); + expectedResults.add(subGraph1); + + RyaSubGraph subGraph2 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, statement10)); + subGraph2.setStatements(stmnts2); + expectedResults.add(subGraph2); + + ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, results); + } + + @Test + public void constructQueryWithBlankNodesAndMultipleSubGraphs() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = "CONSTRUCT { _:b <urn:travelsTo> ?city . _:b <urn:friendsWith> ?worker }" + "WHERE { " + + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> ?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}"; + + // Create the Statements that will be loaded into Rya. + RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:talksTo"), new RyaURI("urn:Bob")); + RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:livesIn"), new RyaURI("urn:London")); + RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack")); + RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:talksTo"), new RyaURI("urn:Evan")); + RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco")); + RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack")); + statement1.setColumnVisibility("U&W".getBytes("UTF-8")); + statement2.setColumnVisibility("V".getBytes("UTF-8")); + statement3.setColumnVisibility("W".getBytes("UTF-8")); + statement4.setColumnVisibility("A&B".getBytes("UTF-8")); + statement5.setColumnVisibility("B".getBytes("UTF-8")); + statement6.setColumnVisibility("C".getBytes("UTF-8")); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadRyaStatements(sparql, Arrays.asList(statement1, statement2, statement3, statement4, statement5, statement6)); + + // Verify the end results of the query match the expected results. + final Set<RyaSubGraph> results = readAllResults(pcjId); + // Create the expected results of the SPARQL query once the PCJ has been + // computed. + RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:travelsTo"), new RyaURI("urn:London")); + RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Bob")); + RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco")); + RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan")); + statement7.setColumnVisibility("U&V&W".getBytes("UTF-8")); + statement8.setColumnVisibility("U&V&W".getBytes("UTF-8")); + statement9.setColumnVisibility("A&B&C".getBytes("UTF-8")); + statement10.setColumnVisibility("A&B&C".getBytes("UTF-8")); + + final Set<RyaSubGraph> expectedResults = new HashSet<>(); + + RyaSubGraph subGraph1 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, statement8)); + subGraph1.setStatements(stmnts1); + expectedResults.add(subGraph1); + + RyaSubGraph subGraph2 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, statement10)); + subGraph2.setStatements(stmnts2); + expectedResults.add(subGraph2); + + ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results); + } + + protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final String TopicName) { + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final KafkaConsumer<String, RyaSubGraph> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } + + private Set<RyaSubGraph> readAllResults(final String pcjId) throws Exception { + requireNonNull(pcjId); + + // Read all of the results from the Kafka topic. + final Set<RyaSubGraph> results = new HashSet<>(); + + try (final KafkaConsumer<String, RyaSubGraph> consumer = makeRyaSubGraphConsumer(pcjId)) { + final ConsumerRecords<String, RyaSubGraph> records = consumer.poll(5000); + final Iterator<ConsumerRecord<String, RyaSubGraph>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + results.add(recordIterator.next().value()); + } + } + + return results; + } + + protected String loadStatements(final String sparql, final Collection<Statement> statements) throws Exception { + return loadRyaStatements(sparql, statements.stream().map(x -> RdfToRyaConversions.convertStatement(x)).collect(Collectors.toSet())); + } + + + protected String loadRyaStatements(final String sparql, final Collection<RyaStatement> statements) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); + FluoClient client = null; + + try { + CreatePcj createPcj = new CreatePcj(); + client = new FluoClientImpl(super.getFluoConfiguration()); + FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql); + + AccumuloRyaDAO dao = getRyaDAO(); + dao.add(statements.iterator()); + + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); + + // FluoITHelper.printFluoTable(client); + return fluoQuery.getConstructQueryMetadata().get().getNodeId(); + } finally { + if (client != null) { + client.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java index cf9dfef..2fbe334 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java @@ -44,7 +44,6 @@ class FunctionAdapter implements Function { @Override public Value evaluate(ValueFactory valueFactory, Value... args) throws ValueExprEvaluationException { - System.out.println("Evaluate: Valuefactory=" + valueFactory); // need a Adapter for org.eclipse.rdf4j.model.ValueFactory org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = org.eclipse.rdf4j.model.impl.SimpleValueFactory.getInstance(); // org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = new ValueFactoryAdapter(valueFactory); @@ -61,7 +60,6 @@ class FunctionAdapter implements Function { org.eclipse.rdf4j.model.Literal vLiteral = (org.eclipse.rdf4j.model.Literal) v; org.openrdf.model.URI vType = valueFactory.createURI(vLiteral.getDatatype().stringValue()); org.openrdf.model.Literal theReturnValue = valueFactory.createLiteral(vLiteral.getLabel(), vType); - System.out.println("Function RETURNS:" + theReturnValue + " class:" + theReturnValue.getClass() + " rdf4j=" + v + " class:" + v.getClass()); return theReturnValue; } //
