http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java new file mode 100644 index 0000000..cb7557c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.apache.rya.periodic.notification.api.CreatePeriodicQuery; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +public class PeriodicNotificationApplicationIT extends RyaExportITBase { + + private PeriodicNotificationApplication app; + private KafkaNotificationRegistrationClient registrar; + private KafkaProducer<String, CommandNotification> producer; + private Properties props; + private Properties kafkaProps; + PeriodicNotificationApplicationConfiguration conf; + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private ZkUtils zkUtils; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + @Before + public void init() throws Exception { + setUpKafka(); + props = getProps(); + conf = new PeriodicNotificationApplicationConfiguration(props); + kafkaProps = getKafkaProperties(conf); + app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); + producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); + registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); + } + + private void setUpKafka() throws Exception { + // Setup Kafka. + zkServer = new EmbeddedZookeeper(); + final String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Brokersparql + final Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + } + + @Test + public void periodicApplicationWithAggAndGroupByTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?type (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasObsType> ?type } group by ?type"; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to makell + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + addData(statements); + app.start(); +// + Multimap<Long, BindingSet> actual = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + actual.put(binId, result); + } + } + + Map<Long, Set<BindingSet>> expected = new HashMap<>(); + + Set<BindingSet> expected1 = new HashSet<>(); + QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs1.addBinding("type", vf.createLiteral("airplane")); + + QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs2.addBinding("type", vf.createLiteral("ship")); + + QueryBindingSet bs3 = new QueryBindingSet(); + bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs3.addBinding("type", vf.createLiteral("automobile")); + + expected1.add(bs1); + expected1.add(bs2); + expected1.add(bs3); + + Set<BindingSet> expected2 = new HashSet<>(); + QueryBindingSet bs4 = new QueryBindingSet(); + bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); + bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs4.addBinding("type", vf.createLiteral("airplane")); + + QueryBindingSet bs5 = new QueryBindingSet(); + bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); + bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs5.addBinding("type", vf.createLiteral("ship")); + + expected2.add(bs4); + expected2.add(bs5); + + Set<BindingSet> expected3 = new HashSet<>(); + QueryBindingSet bs6 = new QueryBindingSet(); + bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); + bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs6.addBinding("type", vf.createLiteral("ship")); + + QueryBindingSet bs7 = new QueryBindingSet(); + bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); + bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs7.addBinding("type", vf.createLiteral("airplane")); + + expected3.add(bs6); + expected3.add(bs7); + + expected.put(ids.get(0), expected1); + expected.put(ids.get(1), expected2); + expected.put(ids.get(2), expected3); + + Assert.assertEquals(3, actual.asMap().size()); + for(Long ident: ids) { + Assert.assertEquals(expected.get(ident), actual.get(ident)); + } + } + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + } + + + @Test + public void periodicApplicationWithAggTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } "; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to make + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + addData(statements); + app.start(); +// + Multimap<Long, BindingSet> expected = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + expected.put(binId, result); + } + } + + Assert.assertEquals(3, expected.asMap().size()); + int i = 0; + for(Long ident: ids) { + Assert.assertEquals(1, expected.get(ident).size()); + BindingSet bs = expected.get(ident).iterator().next(); + Value val = bs.getValue("total"); + int total = Integer.parseInt(val.stringValue()); + Assert.assertEquals(3-i, total); + i++; + } + } + + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + + } + + + @Test + public void periodicApplicationTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?obs ?id where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } "; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to make + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + addData(statements); + app.start(); +// + Multimap<Long, BindingSet> expected = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + expected.put(binId, result); + } + } + + Assert.assertEquals(3, expected.asMap().size()); + int i = 0; + for(Long ident: ids) { + Assert.assertEquals(3-i, expected.get(ident).size()); + i++; + } + } + + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + + } + + + @After + public void shutdown() { + registrar.close(); + app.stop(); + teardownKafka(); + } + + private void teardownKafka() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + private void addData(Collection<Statement> statements) throws DatatypeConfigurationException { + // add statements to Fluo + try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { + InsertTriples inserter = new InsertTriples(); + statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); + getMiniFluo().waitForObservers(); +// FluoITHelper.printFluoTable(fluo); + } + + } + + private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); + kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId()); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return kafkaProps; + } + + + private Properties getProps() throws IOException { + + Properties props = new Properties(); + try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) { + props.load(in); + } + + FluoConfiguration fluoConf = getFluoConfiguration(); + props.setProperty("accumulo.user", getUsername()); + props.setProperty("accumulo.password", getPassword()); + props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName()); + props.setProperty("accumulo.zookeepers", getMiniAccumuloCluster().getZooKeepers()); + props.setProperty("accumulo.rya.prefix", getRyaInstanceName()); + props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, fluoConf.getApplicationName()); + props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, fluoConf.getAccumuloTable()); + return props; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java new file mode 100644 index 0000000..1902248 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; + +import org.junit.Assert; + +public class PeriodicNotificationProviderIT extends AccumuloExportITBase { + + @Test + public void testProvider() throws MalformedQueryException, InterruptedException { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications); + PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); + CreatePcj pcj = new CreatePcj(); + + String id = null; + try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { + id = pcj.createPcj(sparql, fluo); + provider.processRegisteredNotifications(coord, fluo.newSnapshot()); + } + + TimestampedNotification notification = notifications.take(); + Assert.assertEquals(5000, notification.getInitialDelay()); + Assert.assertEquals(15000, notification.getPeriod()); + Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit()); + Assert.assertEquals(id, notification.getId()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java new file mode 100644 index 0000000..c0efc4f --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.exporter; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.kafka.base.KafkaITBase; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class PeriodicNotificationExporterIT extends KafkaITBase { + + private static final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void testExporter() throws InterruptedException { + + BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>(); + Properties props = createKafkaConfig(); + + KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(props), 1, records); + exporter.start(); + + QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L)); + bs1.addBinding("name", vf.createURI("uri:Bob")); + BindingSetRecord record1 = new BindingSetRecord(bs1, "topic1"); + + QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L)); + bs2.addBinding("name", vf.createURI("uri:Joe")); + BindingSetRecord record2 = new BindingSetRecord(bs2, "topic2"); + + records.add(record1); + records.add(record2); + + Set<BindingSet> expected1 = new HashSet<>(); + expected1.add(bs1); + Set<BindingSet> expected2 = new HashSet<>(); + expected2.add(bs2); + + Set<BindingSet> actual1 = getBindingSetsFromKafka("topic1"); + Set<BindingSet> actual2 = getBindingSetsFromKafka("topic2"); + + Assert.assertEquals(expected1, actual1); + Assert.assertEquals(expected2, actual2); + + exporter.stop(); + + } + + + private Properties createKafkaConfig() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); + + return props; + } + + + private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String TopicName) { + // setup consumer + final Properties consumerProps = createKafkaConfig(); + final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } + + private Set<BindingSet> getBindingSetsFromKafka(String topic) { + KafkaConsumer<String, BindingSet> consumer = null; + + try { + consumer = makeBindingSetConsumer(topic); + ConsumerRecords<String, BindingSet> records = consumer.poll(5000); + + Set<BindingSet> bindingSets = new HashSet<>(); + records.forEach(x -> bindingSets.add(x.value())); + + return bindingSets; + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java new file mode 100644 index 0000000..fa60e48 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class PeriodicNotificationProcessorIT extends AccumuloExportITBase { + + private static final ValueFactory vf = new ValueFactoryImpl(); + private static final String RYA_INSTANCE_NAME = "rya_"; + + @Test + public void periodicProcessorTest() throws Exception { + + String id = UUID.randomUUID().toString().replace("-", ""); + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); + + TimestampedNotification ts1 = new TimestampedNotification( + PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); + long binId1 = (ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod(); + + Thread.sleep(2000); + + TimestampedNotification ts2 = new TimestampedNotification( + PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); + long binId2 = (ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod(); + + Set<NodeBin> expectedBins = new HashSet<>(); + expectedBins.add(new NodeBin(id, binId1)); + expectedBins.add(new NodeBin(id, binId2)); + + Set<BindingSet> expected = new HashSet<>(); + Set<VisibilityBindingSet> storageResults = new HashSet<>(); + + QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding("periodicBinId", vf.createLiteral(binId1)); + bs1.addBinding("id", vf.createLiteral(1)); + expected.add(bs1); + storageResults.add(new VisibilityBindingSet(bs1)); + + QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("periodicBinId", vf.createLiteral(binId1)); + bs2.addBinding("id", vf.createLiteral(2)); + expected.add(bs2); + storageResults.add(new VisibilityBindingSet(bs2)); + + QueryBindingSet bs3 = new QueryBindingSet(); + bs3.addBinding("periodicBinId", vf.createLiteral(binId2)); + bs3.addBinding("id", vf.createLiteral(3)); + expected.add(bs3); + storageResults.add(new VisibilityBindingSet(bs3)); + + QueryBindingSet bs4 = new QueryBindingSet(); + bs4.addBinding("periodicBinId", vf.createLiteral(binId2)); + bs4.addBinding("id", vf.createLiteral(4)); + expected.add(bs4); + storageResults.add(new VisibilityBindingSet(bs4)); + + PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), + RYA_INSTANCE_NAME); + periodicStorage.createPeriodicQuery(id, "select ?id where {?obs <urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id")); + periodicStorage.addPeriodicQueryResults(id, storageResults); + + NotificationProcessorExecutor processor = new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, 1); + processor.start(); + + notifications.add(ts1); + notifications.add(ts2); + + Thread.sleep(5000); + + Assert.assertEquals(expectedBins.size(), bins.size()); + Assert.assertEquals(true, bins.containsAll(expectedBins)); + + Set<BindingSet> actual = new HashSet<>(); + bindingSets.forEach(x -> actual.add(x.getBindingSet())); + Assert.assertEquals(expected, actual); + + processor.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java new file mode 100644 index 0000000..27acc9c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.xml.datatype.DatatypeFactory; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.recipes.test.FluoITHelper; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.apache.rya.periodic.notification.api.CreatePeriodicQuery; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Sets; + +public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { + + + @Test + public void periodicPrunerTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n + + FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration()); + + // initialize resources and create pcj + PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), + getRyaInstanceName()); + CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage); + PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql); + String queryId = notification.getId(); + + // create statements to ingest into Fluo + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2"))); + + // add statements to Fluo + InsertTriples inserter = new InsertTriples(); + statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); + + super.getMiniFluo().waitForObservers(); + + // FluoITHelper.printFluoTable(fluo); + + // Create the expected results of the SPARQL query once the PCJ has been + // computed. + final Set<BindingSet> expected1 = new HashSet<>(); + final Set<BindingSet> expected2 = new HashSet<>(); + final Set<BindingSet> expected3 = new HashSet<>(); + final Set<BindingSet> expected4 = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime / period) * period; + + long bin1 = binId; + long bin2 = binId + period; + long bin3 = binId + 2 * period; + long bin4 = binId + 3 * period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin3)); + expected3.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin3)); + expected3.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin4)); + expected4.add(bs); + + // make sure that expected and actual results align after ingest + compareResults(periodicStorage, queryId, bin1, expected1); + compareResults(periodicStorage, queryId, bin2, expected2); + compareResults(periodicStorage, queryId, bin3, expected3); + compareResults(periodicStorage, queryId, bin4, expected4); + + BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins); + pruner.start(); + + bins.add(new NodeBin(queryId, bin1)); + bins.add(new NodeBin(queryId, bin2)); + bins.add(new NodeBin(queryId, bin3)); + bins.add(new NodeBin(queryId, bin4)); + + Thread.sleep(10000); + + compareResults(periodicStorage, queryId, bin1, new HashSet<>()); + compareResults(periodicStorage, queryId, bin2, new HashSet<>()); + compareResults(periodicStorage, queryId, bin3, new HashSet<>()); + compareResults(periodicStorage, queryId, bin4, new HashSet<>()); + + compareFluoCounts(fluo, queryId, bin1); + compareFluoCounts(fluo, queryId, bin2); + compareFluoCounts(fluo, queryId, bin3); + compareFluoCounts(fluo, queryId, bin4); + + pruner.stop(); + + } + + private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception { + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) { + Set<BindingSet> actual = new HashSet<>(); + while(iter.hasNext()) { + actual.add(iter.next()); + } + Assert.assertEquals(expected, actual); + } + } + + private void compareFluoCounts(FluoClient client, String queryId, long bin) { + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG)); + + VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID); + + try(Snapshot sx = client.newSnapshot()) { + String fluoQueryId = sx.get(Bytes.of(queryId), FluoQueryColumns.PCJ_ID_QUERY_ID).toString(); + Set<String> ids = new HashSet<>(); + PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids); + for(String id: ids) { + NodeType optNode = NodeType.fromNodeId(id).orNull(); + if(optNode == null) throw new RuntimeException("Invalid NodeType."); + Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs); + RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build(); + int count = 0; + Iterator<ColumnScanner> colScannerIter = scanner.iterator(); + while(colScannerIter.hasNext()) { + ColumnScanner colScanner = colScannerIter.next(); + String row = colScanner.getRow().toString(); + Iterator<ColumnValue> values = colScanner.iterator(); + while(values.hasNext()) { + values.next(); + count++; + } + } + Assert.assertEquals(0, count); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java new file mode 100644 index 0000000..bde406f --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.log4j.BasicConfigurator; +import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase { + + private static final String topic = "topic"; + private KafkaNotificationRegistrationClient registration; + private PeriodicNotificationCoordinatorExecutor coord; + private KafkaNotificationProvider provider; + + @Test + public void kafkaNotificationProviderTest() throws InterruptedException { + + BasicConfigurator.configure(); + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + Properties props = createKafkaConfig(); + KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + registration = new KafkaNotificationRegistrationClient(topic, producer); + coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); + provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); + provider.start(); + + registration.addNotification("1", 1, 0, TimeUnit.SECONDS); + Thread.sleep(4000); + // check that notifications are being added to the blocking queue + Assert.assertEquals(true, notifications.size() > 0); + + registration.deleteNotification("1"); + Thread.sleep(2000); + int size = notifications.size(); + // sleep for 2 seconds to ensure no more messages being produced + Thread.sleep(2000); + Assert.assertEquals(size, notifications.size()); + + tearDown(); + } + + @Test + public void kafkaNotificationMillisProviderTest() throws InterruptedException { + + BasicConfigurator.configure(); + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + Properties props = createKafkaConfig(); + KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + registration = new KafkaNotificationRegistrationClient(topic, producer); + coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); + provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); + provider.start(); + + registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS); + Thread.sleep(4000); + // check that notifications are being added to the blocking queue + Assert.assertEquals(true, notifications.size() > 0); + + registration.deleteNotification("1"); + Thread.sleep(2000); + int size = notifications.size(); + // sleep for 2 seconds to ensure no more messages being produced + Thread.sleep(2000); + Assert.assertEquals(size, notifications.size()); + + tearDown(); + } + + private void tearDown() { + registration.close(); + provider.stop(); + coord.stop(); + } + + private Properties createKafkaConfig() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + + return props; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties new file mode 100644 index 0000000..4b25b93 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#/ +accumulo.auths= +accumulo.instance="instance" +accumulo.user="root" +accumulo.password="secret" +accumulo.rya.prefix="rya_" +accumulo.zookeepers= +fluo.app.name="fluo_app" +fluo.table.name="fluo_table" +kafka.bootstrap.servers=127.0.0.1:9092 +kafka.notification.topic=notifications +kafka.notification.client.id=consumer0 +kafka.notification.group.id=group0 +cep.coordinator.threads=1 +cep.producer.threads=1 +cep.exporter.threads=1 +cep.processor.threads=1 +cep.pruner.threads=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml new file mode 100644 index 0000000..2173888 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml @@ -0,0 +1,107 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to you under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.service.notification</artifactId> + + <name>Apache Rya Periodic Service Notification</name> + <description>Notifications for Rya Periodic Service</description> + + <dependencies> + + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-api</artifactId> + <version>0.11.0</version> + </dependency> + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-yarn</artifactId> + <version>0.11.0</version> + <exclusions> + <exclusion> + <artifactId>kafka_2.10</artifactId> + <groupId>org.apache.kafka</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.0</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-query</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing.pcj</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.app</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <encoding>UTF-8</encoding> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java new file mode 100644 index 0000000..571ee1c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.openrdf.query.Binding; + +/** + * Object that cleans up old {@link BindingSet}s corresponding to the specified + * {@link NodeBin}. This class deletes all BindingSets with the bin + * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given + * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID} + * and value equal to the given bin. + * + */ +public interface BinPruner { + + /** + * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}. + * @param bin - NodeBin that indicates which BindingSets to delete.. + */ + public void pruneBindingSetBin(NodeBin bin); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java new file mode 100644 index 0000000..500a435 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; + +/** + * An Object that is used to export {@link BindingSet}s to an external repository or queuing system. + * + */ +public interface BindingSetExporter { + + /** + * This method exports the BindingSet to the external repository or queuing system + * that this BindingSetExporter is configured to export to. + * @param bindingSet - {@link BindingSet} to be exported + * @throws ResultExportException + */ + public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java new file mode 100644 index 0000000..7f71b52 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import java.util.Optional; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.evaluation.function.Function; + +/** + * Object that creates a Periodic Query. A Periodic Query is any query + * requesting periodic updates about events that occurred within a given + * window of time of this instant. This is also known as a rolling window + * query. Period Queries can be expressed using SPARQL by including the + * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI} + * in the query. The user must provide this Function with the following arguments: + * the temporal variable in the query that will be filtered on, the window of time + * that events must occur within, the period at which the user wants to receive updates, + * and the time unit. The following query requests all observations that occurred + * within the last minute and requests updates every 15 seconds. It also performs + * a count on those observations. + * <li> + * <li> prefix function: http://org.apache.rya/function# + * <li> "prefix time: http://www.w3.org/2006/time# + * <li> "select (count(?obs) as ?total) where { + * <li> "Filter(function:periodic(?time, 1, .25, time:minutes)) + * <li> "?obs uri:hasTime ?time. + * <li> "?obs uri:hasId ?id } + * <li> + * + * This class is responsible for taking a Periodic Query expressed as a SPARQL query + * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}. + */ +public class CreatePeriodicQuery { + + private FluoClient fluoClient; + private PeriodicQueryResultStorage periodicStorage; + Function funciton; + PeriodicQueryUtil util; + + + public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) { + this.fluoClient = fluoClient; + this.periodicStorage = periodicStorage; + } + + /** + * Creates a Periodic Query by adding the query to Fluo and using the resulting + * Fluo id to create a {@link PeriodicQueryResultStorage} table. + * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table + * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}. + */ + public PeriodicNotification createPeriodicQuery(String sparql) { + try { + Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); + if(optNode.isPresent()) { + PeriodicQueryNode periodicNode = optNode.get(); + CreatePcj createPcj = new CreatePcj(); + String queryId = createPcj.createPcj(sparql, fluoClient); + periodicStorage.createPeriodicQuery(queryId, sparql); + PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod()) + .timeUnit(periodicNode.getUnit()).build(); + return notification; + } else { + throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); + } + } catch (MalformedQueryException | PeriodicQueryStorageException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates a Periodic Query by adding the query to Fluo and using the resulting + * Fluo id to create a {@link PeriodicQueryResultStorage} table. In addition, this + * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll + * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka. + * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of + * the PeriodicQuery. + * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table + * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication} + * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same) + */ + public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) { + PeriodicNotification notification = createPeriodicQuery(sparql); + periodicClient.addNotification(notification); + return notification.getId(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java new file mode 100644 index 0000000..b1e8bad --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +/** + * Interface providing basic life cycle functionality, + * including stopping and starting any class implementing this + * interface and checking whether is it running. + * + */ +public interface LifeCycle { + + /** + * Starts a running application. + */ + public void start(); + + /** + * Stops a running application. + */ + public void stop(); + + /** + * Determine if application is currently running. + * @return true if application is running and false otherwise. + */ + public boolean currentlyRunning(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java new file mode 100644 index 0000000..3ed7979 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import java.util.Objects; + +/** + * Object used to indicate the id of a given Periodic Query + * along with a particular bin of results. This Object is used + * by the {@link BinPruner} to clean up old query results after + * they have been processed. + * + */ +public class NodeBin { + + private long bin; + private String nodeId; + + public NodeBin(String nodeId, long bin) { + this.bin = bin; + this.nodeId = nodeId; + } + + /** + * @return id of Periodic Query + */ + public String getNodeId() { + return nodeId; + } +/** + * @return bin id of results for a given Periodic Query + */ + public long getBin() { + return bin; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof NodeBin) { + NodeBin bin = (NodeBin) other; + return this.bin == bin.bin && this.nodeId.equals(bin.nodeId); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(bin, nodeId); + } + + @Override + public String toString() { + return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString(); + } + +}
