http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java new file mode 100644 index 0000000..9724704 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueriesIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Transaction; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries.FluoQueryStringBuilder; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + + +public class ListFluoQueriesIT extends RyaExportITBase { + + @Test + public void queryMetadataTest() throws Exception { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + String sparql1 = "select ?x ?y ?z where {?x <uri:p1> ?y; <uri:p2> 'literal1'. ?y <uri:p3> ?z }"; + String sparql2 = "select ?x ?y ?z where {{select ?x ?y ?z {?x <uri:p1> ?y; <uri:p2> ?z. ?y <uri:p3> ?z }}}"; + + // Create the object that will be serialized. + String queryId1 = NodeType.generateNewFluoIdForType(NodeType.QUERY); + final QueryMetadata.Builder builder = QueryMetadata.builder(queryId1); + builder.setQueryType(QueryType.PROJECTION); + builder.setVarOrder(new VariableOrder("y;s;d")); + builder.setSparql(sparql1); + builder.setChildNodeId("childNodeId"); + builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.KAFKA))); + final QueryMetadata meta1 = builder.build(); + + String queryId2 = NodeType.generateNewFluoIdForType(NodeType.QUERY); + final QueryMetadata.Builder builder2 = QueryMetadata.builder(queryId2); + builder2.setQueryType(QueryType.PROJECTION); + builder2.setVarOrder(new VariableOrder("y;s;d")); + builder2.setSparql(sparql2); + builder2.setChildNodeId("childNodeId"); + builder2.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.RYA))); + final QueryMetadata meta2 = builder2.build(); + + try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try (Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, meta1); + dao.write(tx, meta2); + tx.commit(); + } + ListFluoQueries listFluoQueries = new ListFluoQueries(); + List<String> queries = listFluoQueries.listFluoQueries(fluoClient); + + FluoQueryStringBuilder queryBuilder1 = new FluoQueryStringBuilder(); + String expected1 = queryBuilder1.setQueryId(queryId1).setQueryType(QueryType.PROJECTION).setQuery(sparql1) + .setExportStrategies(Sets.newHashSet(ExportStrategy.KAFKA)).build(); + + FluoQueryStringBuilder queryBuilder2 = new FluoQueryStringBuilder(); + String expected2 = queryBuilder2.setQueryId(queryId2).setQueryType(QueryType.PROJECTION).setQuery(sparql2) + .setExportStrategies(Sets.newHashSet(ExportStrategy.RYA)).build(); + + Set<String> expected = new HashSet<>(); + expected.add(expected1); + expected.add(expected2); + + Assert.assertEquals(expected, Sets.newHashSet(queries)); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java index 47a2f29..66aa04b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java @@ -90,7 +90,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ. String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); @@ -130,7 +130,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ. String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(2); @@ -176,7 +176,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ. String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(2); @@ -225,7 +225,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); @@ -264,7 +264,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); @@ -305,7 +305,7 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()); + getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index a1d76cb..27b8222 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -33,7 +33,6 @@ import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; -import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj; @@ -131,7 +130,7 @@ public class CreateDeleteIT extends RyaExportITBase { // Register the PCJ with Rya. final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector()); - final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet(ExportStrategy.NO_OP_EXPORT)); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet()); // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java new file mode 100644 index 0000000..e61104a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import javax.xml.datatype.DatatypeFactory; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Span; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.google.common.collect.Sets; + +public class CreateDeletePeriodicPCJ extends KafkaExportITBase { + + @Test + public void deletePeriodicPCJ() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id }"; // n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))); + + runTest(query, statements, 29); + + } + + + + private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception { + try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + + String topic = "notification_topic"; + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), RYA_INSTANCE_NAME); + PeriodicNotificationClient notificationClient = new KafkaNotificationRegistrationClient(topic, + getNotificationProducer("localhost:9092")); + + CreatePeriodicQuery periodicPCJ = new CreatePeriodicQuery(fluoClient, storage); + String id = periodicPCJ.createPeriodicQuery(query, notificationClient).getQueryId(); + + loadData(statements); + + // Ensure the data was loaded. + final List<Bytes> rows = getFluoTableEntries(fluoClient); + assertEquals(expectedEntries, rows.size()); + + DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage); + deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient); + + // Ensure all data related to the query has been removed. + final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); + assertEquals(0, empty_rows.size()); + + // Ensure that Periodic Service notified to add and delete PeriodicNotification + Set<CommandNotification> notifications; + try (KafkaConsumer<String, CommandNotification> consumer = makeNotificationConsumer(topic)) { + notifications = getKafkaNotifications(topic, 7000, consumer); + } + assertEquals(2, notifications.size()); + + String notificationId = ""; + boolean addCalled = false; + boolean deleteCalled = false; + for (CommandNotification notification : notifications) { + if (notificationId.length() == 0) { + notificationId = notification.getId(); + } else { + assertEquals(notificationId, notification.getId()); + } + + if (notification.getCommand() == Command.ADD) { + addCalled = true; + } + + if (notification.getCommand() == Command.DELETE) { + deleteCalled = true; + } + } + + assertEquals(true, addCalled); + assertEquals(true, deleteCalled); + } + } + + private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) { + try (Snapshot snapshot = fluoClient.newSnapshot()) { + final List<Bytes> rows = new ArrayList<>(); + final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build(); + + for (final ColumnScanner cscanner : rscanner) { + rows.add(cscanner.getRow()); + } + + return rows; + } + } + + private KafkaProducer<String, CommandNotification> getNotificationProducer(String bootStrapServers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private KafkaConsumer<String, CommandNotification> makeNotificationConsumer(final String topic) { + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(topic)); + return consumer; + } + + private Set<CommandNotification> getKafkaNotifications(String topic, int pollTime, + KafkaConsumer<String, CommandNotification> consumer) { + requireNonNull(topic); + + // Read all of the results from the Kafka topic. + final Set<CommandNotification> results = new HashSet<>(); + + final ConsumerRecords<String, CommandNotification> records = consumer.poll(pollTime); + final Iterator<ConsumerRecord<String, CommandNotification>> recordIterator = records.iterator(); + while (recordIterator.hasNext()) { + results.add(recordIterator.next().value()); + } + + return results; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 8911f56..dbedfb3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -90,7 +90,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); FluoITHelper.printFluoTable(super.getFluoConfiguration()); @@ -136,7 +136,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -163,7 +163,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -194,7 +194,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(3.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -221,7 +221,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:count"), vf.createLiteral(2))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -248,7 +248,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(8))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { FluoITHelper.printFluoTable(fluo); @@ -280,7 +280,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -307,7 +307,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(2.75))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -338,7 +338,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:banana"), vf.createURI("urn:price"), vf.createLiteral(1.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); @@ -399,7 +399,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); @@ -477,7 +477,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); @@ -554,7 +554,7 @@ public class KafkaExportIT extends KafkaExportITBase { vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); // Create the PCJ in Fluo and load the statements into Rya. - final String pcjId = loadData(sparql, statements); + final String pcjId = loadDataAndCreateQuery(sparql, statements); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 0aefaca..4974aee 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -70,10 +70,6 @@ import com.google.common.collect.Sets; */ public class QueryIT extends RyaExportITBase { - private enum ExporterType { - Pcj, Periodic - }; - @Test public void optionalStatements() throws Exception { // A query that has optional statement patterns. This query is looking for all @@ -112,7 +108,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } /** @@ -187,7 +183,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } @Test @@ -241,7 +237,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } @Test @@ -278,7 +274,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } @Test @@ -359,7 +355,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } @Test @@ -424,7 +420,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults, ExporterType.Pcj); + runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } @Test @@ -525,7 +521,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } @Test @@ -596,7 +592,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } @Test @@ -713,7 +709,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } @@ -792,7 +788,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } @Test @@ -876,7 +872,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } @Test(expected= UnsupportedQueryException.class) @@ -896,11 +892,11 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); // Verify the end results of the query match the expected results. - runTest(query, statements, expectedResults, ExporterType.Periodic); + runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults, - ExporterType type) throws Exception { + ExportStrategy strategy) throws Exception { requireNonNull(sparql); requireNonNull(statements); requireNonNull(expectedResults); @@ -910,8 +906,8 @@ public class QueryIT extends RyaExportITBase { final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn); - switch (type) { - case Pcj: + switch (strategy) { + case RYA: ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); addStatementsAndWait(statements); // Fetch the value that is stored within the PCJ table. @@ -922,11 +918,11 @@ public class QueryIT extends RyaExportITBase { assertEquals(expectedResults, results); } break; - case Periodic: + case PERIODIC: PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName()); String periodicId = periodicStorage.createPeriodicQuery(sparql); try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { - new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo); + new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo); } addStatementsAndWait(statements); @@ -938,6 +934,8 @@ public class QueryIT extends RyaExportITBase { } assertEquals(expectedResults, results); break; + default: + throw new RuntimeException("Invalid export option"); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index ed9ce60..59fe54f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -321,7 +321,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { return consumer; } - protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception { + protected String loadDataAndCreateQuery(final String sparql, final Collection<Statement> statements) throws Exception { requireNonNull(sparql); requireNonNull(statements); @@ -334,7 +334,16 @@ public class KafkaExportITBase extends AccumuloExportITBase { final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA)); - // Write the data to Rya. + loadData(statements); + + // The PCJ Id is the topic name the results will be written to. + return pcjId; + } + + protected void loadData(final Collection<Statement> statements) throws Exception { + + requireNonNull(statements); + final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); ryaConn.begin(); ryaConn.add(statements); @@ -343,9 +352,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { // Wait for the Fluo application to finish computing the end result. super.getMiniFluo().waitForObservers(); - - // The PCJ Id is the topic name the results will be written to. - return pcjId; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/.gitignore ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/.gitignore b/extras/rya.periodic.service/periodic.service.api/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/.gitignore @@ -0,0 +1 @@ +/target/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/pom.xml b/extras/rya.periodic.service/periodic.service.api/pom.xml new file mode 100644 index 0000000..b57beaf --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/pom.xml @@ -0,0 +1,52 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to you under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.service.api</artifactId> + + <name>Apache Rya Periodic Service API</name> + <description>API for Periodic Service Application</description> + + <dependencies> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-query</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing.pcj</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java new file mode 100644 index 0000000..f4a083c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; + +/** + * Object that cleans up old {@link BindingSet}s corresponding to the specified + * {@link NodeBin}. This class deletes all BindingSets with the bin + * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given + * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID} + * and value equal to the given bin. + * + */ +public interface BinPruner { + + /** + * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}. + * @param bin - NodeBin that indicates which BindingSets to delete.. + */ + public void pruneBindingSetBin(NodeBin bin); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java new file mode 100644 index 0000000..491576b --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.openrdf.query.BindingSet; + +/** + * An Object that is used to export {@link BindingSet}s to an external repository or queuing system. + * + */ +public interface BindingSetExporter { + + /** + * This method exports the BindingSet to the external repository or queuing system + * that this BindingSetExporter is configured to export to. + * @param bindingSet - {@link BindingSet} to be exported + * @throws ResultExportException + */ + public void exportNotification(BindingSetRecord bindingSet) throws BindingSetRecordExportException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java new file mode 100644 index 0000000..c3f70f1 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecord.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.openrdf.query.BindingSet; + +import com.google.common.base.Objects; + +/** + * Object that associates a {@link BindingSet} with a given Kafka topic. + * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export + * each BindingSet to its appropriate topic. + * + */ +public class BindingSetRecord { + + private BindingSet bs; + private String topic; + + public BindingSetRecord(BindingSet bs, String topic) { + this.bs = bs; + this.topic = topic; + } + + /** + * @return BindingSet in this BindingSetRecord + */ + public BindingSet getBindingSet() { + return bs; + } + + /** + * @return Kafka topic for this BindingSetRecord + */ + public String getTopic() { + return topic; + } + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } + + if(o instanceof BindingSetRecord) { + BindingSetRecord record = (BindingSetRecord) o; + return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(bs, topic); + } + + @Override + public String toString() { + return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n") + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java new file mode 100644 index 0000000..94e4980 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/BindingSetRecordExportException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +/** + * A result could not be exported. + */ +public class BindingSetRecordExportException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link BindingSetRecordExportException}. + * + * @param message - Explains why the exception was thrown. + */ + public BindingSetRecordExportException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link BindingSetRecordExportException}. + * + * @param message - Explains why the exception was thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public BindingSetRecordExportException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java new file mode 100644 index 0000000..b1e8bad --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +/** + * Interface providing basic life cycle functionality, + * including stopping and starting any class implementing this + * interface and checking whether is it running. + * + */ +public interface LifeCycle { + + /** + * Starts a running application. + */ + public void start(); + + /** + * Stops a running application. + */ + public void stop(); + + /** + * Determine if application is currently running. + * @return true if application is running and false otherwise. + */ + public boolean currentlyRunning(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java new file mode 100644 index 0000000..3ed7979 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import java.util.Objects; + +/** + * Object used to indicate the id of a given Periodic Query + * along with a particular bin of results. This Object is used + * by the {@link BinPruner} to clean up old query results after + * they have been processed. + * + */ +public class NodeBin { + + private long bin; + private String nodeId; + + public NodeBin(String nodeId, long bin) { + this.bin = bin; + this.nodeId = nodeId; + } + + /** + * @return id of Periodic Query + */ + public String getNodeId() { + return nodeId; + } +/** + * @return bin id of results for a given Periodic Query + */ + public long getBin() { + return bin; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof NodeBin) { + NodeBin bin = (NodeBin) other; + return this.bin == bin.bin && this.nodeId.equals(bin.nodeId); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(bin, nodeId); + } + + @Override + public String toString() { + return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java new file mode 100644 index 0000000..3e9e0d1 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/Notification.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +/** + * Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic + * Query with the indicated id. + * + */ +public interface Notification { + + /** + * @return id of a Periodic Query + */ + public String getId(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java new file mode 100644 index 0000000..d53dc17 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.rya.periodic.notification.notification.CommandNotification; + +/** + * Object that manages the periodic notifications for the Periodic Query Service. + * This Object processes new requests for periodic updates by registering them with + * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}). + * + */ +public interface NotificationCoordinatorExecutor extends LifeCycle { + + /** + * Registers or deletes a {@link CommandNotification}s with the periodic service to + * generate notifications at a regular interval indicated by the CommandNotification. + * @param notification - CommandNotification to be registered or deleted from the periodic update + * service. + */ + public void processNextCommandNotification(CommandNotification notification); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java new file mode 100644 index 0000000..4ac9089 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java @@ -0,0 +1,41 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import org.apache.rya.periodic.notification.notification.TimestampedNotification; + +/** + * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}. + * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort + * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving + * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them + * and then providing them to another service to be exported. + * + */ +public interface NotificationProcessor { + + /** + * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results + * associated the query id given by {@link TimestampedNotification#getId()}. + * @param notification - contains information about which query results to retrieve + */ + public void processNotification(TimestampedNotification notification); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java new file mode 100644 index 0000000..ff08733 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.api; + +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; + +/** + * Object to register {@link PeriodicNotification}s with an external queuing + * service to be handled by a {@link NotificationCoordinatorExecutor} service. + * The service will generate notifications to process Periodic Query results at regular + * intervals corresponding the period of the PeriodicNotification. + * + */ +public interface PeriodicNotificationClient extends AutoCloseable { + + /** + * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor} + * @param notification - notification to be added + */ + public void addNotification(PeriodicNotification notification); + + /** + * Deletes a notification from the {@link NotificationCoordinatorExecutor}. + * @param notification - notification to be deleted + */ + public void deleteNotification(BasicNotification notification); + + /** + * Deletes a notification from the {@link NotificationCoordinatorExecutor}. + * @param notification - id corresponding to the notification to be deleted + */ + public void deleteNotification(String notificationId); + + /** + * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor} + * @param id - Periodic Query id + * @param period - period indicating frequency at which notifications will be generated + * @param delay - initial delay for starting periodic notifications + * @param unit - time unit of delay and period + */ + public void addNotification(String id, long period, long delay, TimeUnit unit); + + public void close(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java new file mode 100644 index 0000000..c31a5c0 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; + +/** + * Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic + * Query with the indicated id. + * + */ +public class BasicNotification implements Notification { + + private String id; + + /** + * Creates a BasicNotification + * @param id - Fluo query id associated with this Notification + */ + public BasicNotification(String id) { + this.id = id; + } + + /** + * @return the Fluo Query Id that this notification will generate results for + */ + @Override + public String getId() { + return id; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof BasicNotification) { + BasicNotification not = (BasicNotification) other; + return Objects.equal(this.id, not.id); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + return builder.append("id").append("=").append(id).toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java new file mode 100644 index 0000000..597b228 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This Object contains a Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic Query with the + * indicated id. Additionally, the CommandNotification contains a + * {@link Command} about which action the + * {@link NotificationCoordinatorExecutor} should take (adding or deleting). + * CommandNotifications are meant to be added to an external work queue (such as + * Kafka) to be processed by the NotificationCoordinatorExecutor. + * + */ +public class CommandNotification implements Notification { + + private Notification notification; + private Command command; + + public enum Command { + ADD, DELETE + }; + + /** + * Creates a new CommandNotification + * @param command - the command associated with this notification (either add, update, or delete) + * @param notification - the underlying notification associated with this command + */ + public CommandNotification(Command command, Notification notification) { + this.notification = Preconditions.checkNotNull(notification); + this.command = Preconditions.checkNotNull(command); + } + + @Override + public String getId() { + return notification.getId(); + } + + /** + * Returns {@link Notification} contained by this CommmandNotification. + * @return - Notification contained by this Object + */ + public Notification getNotification() { + return this.notification; + } + + /** + * @return Command contained by this Object (either add or delete) + */ + public Command getCommand() { + return this.command; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof CommandNotification) { + CommandNotification cn = (CommandNotification) other; + return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hashCode(command, notification); + } + + @Override + public String toString() { + return new StringBuilder().append("command").append("=").append(command.toString()).append(";") + .append(notification.toString()).toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java new file mode 100644 index 0000000..aa9e581 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Preconditions; + +/** + * Notification Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. + * Additionally, this Object contains a period that indicates a frequency at + * which regular updates are generated. + * + */ +public class PeriodicNotification implements Notification { + + private String id; + private long period; + private TimeUnit periodTimeUnit; + private long initialDelay; + + /** + * Creates a PeriodicNotification. + * @param id - Fluo Query Id that this notification is associated with + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with the period and delay + * @param initialDelay - amount of time to wait before generating the first notification + */ + public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { + this.id = Preconditions.checkNotNull(id); + this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit); + Preconditions.checkArgument(period > 0 && initialDelay >= 0); + this.period = period; + this.initialDelay = initialDelay; + } + + + /** + * Create a PeriodicNotification + * @param other - other PeriodicNotification used in copy constructor + */ + public PeriodicNotification(PeriodicNotification other) { + this(other.id, other.period, other.periodTimeUnit, other.initialDelay); + } + + public String getId() { + return id; + } + + /** + * @return - period at which regular notifications are generated + */ + public long getPeriod() { + return period; + } + + /** + * @return time unit of period and initial delay + */ + public TimeUnit getTimeUnit() { + return periodTimeUnit; + } + + /** + * @return amount of time to delay before beginning to generate notifications + */ + public long getInitialDelay() { + return initialDelay; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + String delim = "="; + String delim2 = ";"; + return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2) + .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim) + .append(initialDelay).toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof PeriodicNotification)) { + return false; + } + + PeriodicNotification notification = (PeriodicNotification) other; + return Objects.equals(this.id, notification.id) && (this.period == notification.period) + && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay); + } + + @Override + public int hashCode() { + return Objects.hash(id, period, periodTimeUnit, initialDelay); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private String id; + private long period; + private TimeUnit periodTimeUnit; + private long initialDelay = 0; + + /** + * @param id - periodic query id + * @return - builder to chain method calls + */ + public Builder id(String id) { + this.id = id; + return this; + } + + /** + * @param period of the periodic notification for generating regular notifications + * @return - builder to chain method calls + */ + public Builder period(long period) { + this.period = period; + return this; + } + + /** + * @param timeUnit of period and initial delay + * @return - builder to chain method calls + */ + public Builder timeUnit(TimeUnit timeUnit) { + this.periodTimeUnit = timeUnit; + return this; + } + + /** + * @param initialDelay - amount of time to wait before generating notifications + * @return - builder to chain method calls + */ + public Builder initialDelay(long initialDelay) { + this.initialDelay = initialDelay; + return this; + } + + /** + * Builds PeriodicNotification + * @return PeriodicNotification constructed from Builder specified parameters + */ + public PeriodicNotification build() { + return new PeriodicNotification(id, period, periodTimeUnit, initialDelay); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java new file mode 100644 index 0000000..38073ce --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. Additionally + * this Object contains a {@link Date} object to indicate the date time at which this + * notification was generated. + * + */ +public class TimestampedNotification extends PeriodicNotification { + + private Date date; + + /** + * Constructs a TimestampedNotification + * @param id - Fluo Query Id associated with this Notification + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with period and initial delay + * @param initialDelay - amount of time to wait before generating first notification + */ + public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { + super(id, period, periodTimeUnit, initialDelay); + date = new Date(); + } + + /** + * Creates a TimestampedNotification + * @param notification - PeriodicNotification used to create this TimestampedNotification. + * This constructor creates a time stamp for the TimestampedNotification. + */ + public TimestampedNotification(PeriodicNotification notification) { + super(notification); + date = new Date(); + } + + /** + * @return timestamp at which this notification was generated + */ + public Date getTimestamp() { + return date; + } + + @Override + public String toString() { + return super.toString() + ";date=" + date; + } + +}
