http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java new file mode 100644 index 0000000..1eafc00 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo.integration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.PeriodicQueryTableNameFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.MapBindingSet; + +public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { + + private PeriodicQueryResultStorage periodicStorage; + private static final String RYA = "rya_"; + private static final PeriodicQueryTableNameFactory nameFactory = new PeriodicQueryTableNameFactory(); + private static final ValueFactory vf = new ValueFactoryImpl(); + + @Before + public void init() throws AccumuloException, AccumuloSecurityException { + super.getConnector().securityOperations().changeUserAuthorizations("root", new Authorizations("U")); + periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getConnector(), RYA); + } + + + @Test + public void testCreateAndMeta() throws PeriodicQueryStorageException { + + String sparql = "select ?x where { ?x <urn:pred> ?y.}"; + VariableOrder varOrder = new VariableOrder("periodicBinId", "x"); + PeriodicQueryStorageMetadata expectedMeta = new PeriodicQueryStorageMetadata(sparql, varOrder); + + String id = periodicStorage.createPeriodicQuery(sparql); + Assert.assertEquals(expectedMeta, periodicStorage.getPeriodicQueryMetadata(id)); + Assert.assertEquals(Arrays.asList(nameFactory.makeTableName(RYA, id)), periodicStorage.listPeriodicTables()); + periodicStorage.deletePeriodicQuery(id); + } + + + @Test + public void testAddListDelete() throws Exception { + + String sparql = "select ?x where { ?x <urn:pred> ?y.}"; + String id = periodicStorage.createPeriodicQuery(sparql); + + Set<BindingSet> expected = new HashSet<>(); + Set<VisibilityBindingSet> storageSet = new HashSet<>(); + + //add result matching user's visibility + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("periodicBinId", vf.createLiteral(1L)); + bs.addBinding("x",vf.createURI("uri:uri123")); + expected.add(bs); + storageSet.add(new VisibilityBindingSet(bs,"U")); + + //add result with different visibility that is not expected + bs = new QueryBindingSet(); + bs.addBinding("periodicBinId", vf.createLiteral(1L)); + bs.addBinding("x",vf.createURI("uri:uri456")); + storageSet.add(new VisibilityBindingSet(bs,"V")); + + periodicStorage.addPeriodicQueryResults(id, storageSet); + + Set<BindingSet> actual = new HashSet<>(); + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) { + iter.forEachRemaining(x -> actual.add(x)); + } + + Assert.assertEquals(expected, actual); + + periodicStorage.deletePeriodicQueryResults(id, 1L); + + Set<BindingSet> actual2 = new HashSet<>(); + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) { + iter.forEachRemaining(x -> actual2.add(x)); + } + + Assert.assertEquals(new HashSet<>(), actual2); + periodicStorage.deletePeriodicQuery(id); + + } + + @Test + public void multiBinTest() throws PeriodicQueryStorageException, Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "select ?id (count(?obs) as ?total) where {" //n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasId> ?id } group by ?id"; //n + + + final ValueFactory vf = new ValueFactoryImpl(); + long currentTime = System.currentTimeMillis(); + String queryId = UUID.randomUUID().toString().replace("-", ""); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected1 = new HashSet<>(); + final Set<BindingSet> expected2 = new HashSet<>(); + final Set<BindingSet> expected3 = new HashSet<>(); + final Set<BindingSet> expected4 = new HashSet<>(); + final Set<VisibilityBindingSet> storageResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime/period)*period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expected1.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expected1.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expected1.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expected1.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expected2.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expected2.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expected2.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expected3.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expected3.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + expected4.add(bs); + storageResults.add(new VisibilityBindingSet(bs)); + + + String id = periodicStorage.createPeriodicQuery(queryId, sparql); + periodicStorage.addPeriodicQueryResults(queryId, storageResults); + + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) { + Set<BindingSet> actual1 = new HashSet<>(); + while(iter.hasNext()) { + actual1.add(iter.next()); + } + Assert.assertEquals(expected1, actual1); + } + + periodicStorage.deletePeriodicQueryResults(queryId, binId); + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) { + Set<BindingSet> actual1 = new HashSet<>(); + while(iter.hasNext()) { + actual1.add(iter.next()); + } + Assert.assertEquals(Collections.emptySet(), actual1); + } + + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) { + Set<BindingSet> actual2 = new HashSet<>(); + while(iter.hasNext()) { + actual2.add(iter.next()); + } + Assert.assertEquals(expected2, actual2); + } + + periodicStorage.deletePeriodicQueryResults(queryId, binId + period); + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) { + Set<BindingSet> actual2 = new HashSet<>(); + while(iter.hasNext()) { + actual2.add(iter.next()); + } + Assert.assertEquals(Collections.emptySet(), actual2); + } + + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 2*period))) { + Set<BindingSet> actual3 = new HashSet<>(); + while(iter.hasNext()) { + actual3.add(iter.next()); + } + Assert.assertEquals(expected3, actual3); + } + + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 3*period))) { + Set<BindingSet> actual4 = new HashSet<>(); + while(iter.hasNext()) { + actual4.add(iter.next()); + } + Assert.assertEquals(expected4, actual4); + } + periodicStorage.deletePeriodicQuery(id); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/README.md ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/README.md b/extras/rya.pcj.fluo/README.md index 70361c1..1207705 100644 --- a/extras/rya.pcj.fluo/README.md +++ b/extras/rya.pcj.fluo/README.md @@ -19,7 +19,15 @@ Rya Incrementally Updating Precomputed Joins ============================================ This project is an implementation of the Rya Precomputed Join (PCJ) indexing feature that runs on top of [Fluo][1] so that it may incrementally update the -results of a query as new semantic triples are added to storage. +results of a query as new semantic triples are added to storage. At a high level, the Rya Fluo application +works by registering the individual RDF4J QueryNodes with the Fluo table in the form of metadata. For example, +if a join occurs in a given query, then that join is given a unique id when the query is registered with the Rya +Fluo application, along with metadata indicating its parent node, its left and right child nodes, along with +other information necessary for the application to process the join. In this way, the entire RDF4J query tree is recreated +within Fluo. For each node type supported by the Rya Fluo application, there is also an associated Fluo Observer +that processes BindingSet notifications for that node (this occurs when a new result percolates up the query tree and +arrives at that node in the form of a BindingSet). These Observers incrementally evaluate the queries registered with the +Fluo application by performing the processing required for their associated node as soon as a result for that node is available. This project contains the following modules: * **rya.pcj.fluo.app** - A Fluo application that incrementally updates the results @@ -38,5 +46,20 @@ This project contains the following modules: * **integration** - Contains integration tests that use a MiniAccumuloCluster and MiniFluo to ensure the Rya PCJ Fluo App work within an emulation of the production environment. + + +Currently the Rya Fluo Application supports RDF4J queries that contain Joins, Filters, Projections, StatementPatterns, and Aggregations. +To support the evaluation of additional RDF4J query nodes in the Fluo application, here are the steps that need to be followed: + + 1. Create the appropriate Metadata Object by extending CommonNodeMetadata (e.g. StatementPatternMetadata, JoinMetadata, etc.) + 2. Add metadata Columns to FluoQueryColumns + 3. Create NodeType from the metadata Columns + 4. Add the node prefix to IncrementalUpdateConstants + 5. Integrate metadata with FluoQueryMetadataDAO + 6. Create Updater and integrate with BindingSetUpdater + 7. Create Observer (e.g. StatementPatternObserver, JoinObserver, etc.) + 8. Integrate with SparqlFluoQueryBuilder + +All of the classes mentioned above can be found in the rya.pcj.fluo.app project. [1]: http://fluo.io/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index a17f02f..767d9d2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; @@ -44,9 +45,11 @@ import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.persist.query.BatchRyaQuery; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; @@ -62,6 +65,8 @@ import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; +import com.google.common.base.Preconditions; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -152,13 +157,51 @@ public class CreatePcj { + /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method requires that a - * PCJ table already exist for the query corresponding to the pcjId. Results will be exported - * to this table. + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method + * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated + * inside of Fluo. This method assumes that the user will export the results to Kafka or + * some other external resource. The export id is equivalent to the queryId that is returned, + * which is in contrast to the other createPcj methods in this class which accept an external pcjId + * that is used to identify the Accumulo table or Kafka topic for exporting results. + * + * @param sparql - sparql query String to be registered with Fluo + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return queryId - The id of the root of the query metadata tree in Fluo + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public String createPcj(String sparql, FluoClient fluo) throws MalformedQueryException { + Preconditions.checkNotNull(sparql); + Preconditions.checkNotNull(fluo); + + FluoQuery fluoQuery = makeFluoQuery(sparql); + String queryId = null; + if(fluoQuery.getQueryMetadata().isPresent()) { + queryId = fluoQuery.getQueryMetadata().get().getNodeId(); + queryId = queryId.split(IncrementalUpdateConstants.QUERY_PREFIX)[1]; + } else { + queryId = fluoQuery.getConstructQueryMetadata().get().getNodeId(); + queryId = queryId.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1]; + } + + String[] idArray = queryId.split("_"); + String id = idArray[idArray.length - 1]; + + writeFluoQuery(fluo, fluoQuery, id); + return id; + } + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides + * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely + * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated + * inside of Fluo. This method assumes that the user will export the results to Kafka or + * some other external resource. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param sparql - sparql query String to be registered with Fluo * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @return The metadata that was written to the Fluo application for the PCJ. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. @@ -166,40 +209,113 @@ public class CreatePcj { */ public FluoQuery createPcj( final String pcjId, - final PrecomputedJoinStorage pcjStorage, + final String sparql, final FluoClient fluo) throws MalformedQueryException, PcjException { requireNonNull(pcjId); - requireNonNull(pcjStorage); + requireNonNull(sparql); requireNonNull(fluo); + FluoQuery fluoQuery = makeFluoQuery(sparql); + writeFluoQuery(fluo, fluoQuery, pcjId); + + return fluoQuery; + } + + private FluoQuery makeFluoQuery(String sparql) throws MalformedQueryException { + // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. // We use these IDs later when scanning Rya for historic Statement Pattern matches // as well as setting up automatic exports. final NodeIds nodeIds = new NodeIds(); // Parse the query's structure for the metadata that will be written to fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); - + return new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + } + + private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) { try (Transaction tx = fluo.newTransaction()) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); - - if (fluoQuery.getQueryMetadata().isPresent()) { - // If the query is not a construct query, - // the results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. - final String queryId = fluoQuery.getQueryMetadata().get().getNodeId(); + + // The results of the query are eventually exported to an instance + // of Rya, so store the Rya ID for the PCJ. + QueryMetadata metadata = fluoQuery.getQueryMetadata().orNull(); + if (metadata != null) { + String queryId = metadata.getNodeId(); tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - } + } + // Flush the changes to Fluo. tx.commit(); } + } - return fluoQuery; + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an + * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public FluoQuery createPcj( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo) throws MalformedQueryException, PcjException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + return createPcj(pcjId, sparql, fluo); + } + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * <p> + * This call scans Rya for Statement Pattern matches and inserts them into + * the Fluo application. This method does not verify that a PcjTable with the + * the given pcjId actually exists. It is assumed that results for any query registered + * using this method will be exported to Kafka or some other external service. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param sparql - sparql query that will registered with Fluo. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @param queryEngine - QueryEngine for a given Rya Instance, (not null) + * @return The Fluo application's Query ID of the query that was created. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + */ + public String withRyaIntegration( + final String pcjId, + final String sparql, + final FluoClient fluo, + final Connector accumulo, + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(sparql); + requireNonNull(fluo); + requireNonNull(accumulo); + requireNonNull(ryaInstance); + + + // Write the SPARQL query's structure to the Fluo Application. + final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo); + //import results already ingested into Rya that match query + importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); + // return queryId to the caller for later monitoring from the export. + return fluoQuery.getQueryMetadata().get().getNodeId(); } + /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. @@ -231,31 +347,39 @@ public class CreatePcj { requireNonNull(fluo); requireNonNull(accumulo); requireNonNull(ryaInstance); - - // Write the SPARQL query's structure to the Fluo Application. - final FluoQuery fluoQuery = createPcj(pcjId, pcjStorage, fluo); - + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + + return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance); + } + + private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance) + throws RyaDAOException { // Reuse the same set object while performing batch inserts. final Set<RyaStatement> queryBatch = new HashSet<>(); - // Iterate through each of the statement patterns and insert their historic matches into Fluo. + // Iterate through each of the statement patterns and insert their + // historic matches into Fluo. for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { - // Get an iterator over all of the binding sets that match the statement pattern. + // Get an iterator over all of the binding sets that match the + // statement pattern. final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern()); queryBatch.add(spToRyaStatement(pattern)); } - //Create AccumuloRyaQueryEngine to query for historic results + // Create AccumuloRyaQueryEngine to query for historic results final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setTablePrefix(ryaInstance); conf.setAuths(getAuths(accumulo)); - try(final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); + try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) { final Set<RyaStatement> triplesBatch = new HashSet<>(); // Insert batches of the binding sets into Fluo. - for(final RyaStatement ryaStatement : queryIterable) { + for (final RyaStatement ryaStatement : queryIterable) { if (triplesBatch.size() == spInsertBatchSize) { writeBatch(fluo, triplesBatch); triplesBatch.clear(); @@ -271,14 +395,6 @@ public class CreatePcj { } catch (final IOException e) { log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e); } - - //return queryId to the caller for later monitoring from the export - if(fluoQuery.getConstructQueryMetadata().isPresent()) { - return fluoQuery.getConstructQueryMetadata().get().getNodeId(); - } - - return fluoQuery.getQueryMetadata().get().getNodeId(); - } private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java index 87eb9cc..3052c1d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java @@ -39,6 +39,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.openrdf.query.BindingSet; @@ -50,12 +51,12 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <p> * This is a two phase process. * <ol> - * <li>Delete metadata about each node of the query using a single Fluo - * transaction. This prevents new {@link BindingSet}s from being created when - * new triples are inserted.</li> - * <li>Delete BindingSets associated with each node of the query. This is done - * in a batch fashion to guard against large delete transactions that don't fit - * into memory.</li> + * <li>Delete metadata about each node of the query using a single Fluo + * transaction. This prevents new {@link BindingSet}s from being created when + * new triples are inserted.</li> + * <li>Delete BindingSets associated with each node of the query. This is done + * in a batch fashion to guard against large delete transactions that don't fit + * into memory.</li> * </ol> */ @DefaultAnnotation(NonNull.class) @@ -79,8 +80,10 @@ public class DeletePcj { * Precomputed Join Index from the Fluo application that is incrementally * updating it. * - * @param client - Connects to the Fluo application that is updating the PCJ Index. (not null) - * @param pcjId - The PCJ ID for the query that will removed from the Fluo application. (not null) + * @param client - Connects to the Fluo application that is updating the PCJ + * Index. (not null) + * @param pcjId - The PCJ ID for the query that will removed from the Fluo + * application. (not null) */ public void deletePcj(final FluoClient client, final String pcjId) { requireNonNull(client); @@ -167,6 +170,12 @@ public class DeletePcj { nodeIds.add(aggChild); getChildNodeIds(tx, aggChild, nodeIds); break; + case PERIODIC_QUERY: + final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId); + final String periodicChild = periodicMeta.getChildNodeId(); + nodeIds.add(periodicChild); + getChildNodeIds(tx, periodicChild, nodeIds); + break; case STATEMENT_PATTERN: break; } @@ -215,10 +224,9 @@ public class DeletePcj { } } - /** - * Deletes high level query meta for converting from queryId to pcjId and vice - * versa, as well as converting from sparql to queryId. + * Deletes high level query meta for converting from queryId to pcjId and + * vice versa, as well as converting from sparql to queryId. * * @param tx - Transaction the deletes will be performed with. (not null) * @param pcjId - The PCJ whose metadata will be deleted. (not null) @@ -234,7 +242,6 @@ public class DeletePcj { tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID); } - /** * Deletes all results (BindingSets or Statements) associated with the specified nodeId. * @@ -265,18 +272,18 @@ public class DeletePcj { requireNonNull(scanner); requireNonNull(column); - try(Transaction ntx = tx) { - int count = 0; - final Iterator<RowColumnValue> iter = scanner.iterator(); - while (iter.hasNext() && count < batchSize) { - final Bytes row = iter.next().getRow(); - count++; - tx.delete(row, column); - } + try (Transaction ntx = tx) { + int count = 0; + final Iterator<RowColumnValue> iter = scanner.iterator(); + while (iter.hasNext() && count < batchSize) { + final Bytes row = iter.next().getRow(); + count++; + tx.delete(row, column); + } - final boolean hasNext = iter.hasNext(); - tx.commit(); - return hasNext; + final boolean hasNext = iter.hasNext(); + tx.commit(); + return hasNext; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index 38fff95..b151c0e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -1,35 +1,28 @@ <?xml version="1.0" encoding="utf-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - http://www.apache.org/licenses/LICENSE-2.0 + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.app</artifactId> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.parent</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>rya.pcj.fluo.app</artifactId> - - <name>Apache Rya PCJ Fluo App</name> - <description> + <name>Apache Rya PCJ Fluo App</name> + <description> A Fluo implementation of Rya Precomputed Join Indexing. This module produces a jar that may be executed by the 'fluo' command line tool as a YARN job. </description> @@ -72,6 +65,10 @@ under the License. <groupId>org.apache.fluo</groupId> <artifactId>fluo-recipes-accumulo</artifactId> </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryrender</artifactId> + </dependency> <dependency> <groupId>org.apache.kafka</groupId> @@ -123,27 +120,29 @@ under the License. </dependency> </dependencies> - <build> - <plugins> - <!-- Use the pre-build 'jar-with-dependencies' assembly to package the dependent class files into the final jar. - This creates a jar file that can be deployed to Fluo without having to include any dependent jars. --> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + + <build> + <plugins> + <!-- Use the pre-build 'jar-with-dependencies' assembly to package the + dependent class files into the final jar. This creates a jar file that can + be deployed to Fluo without having to include any dependent jars. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java deleted file mode 100644 index ae976ee..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.concurrent.atomic.AtomicReference; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -import org.openrdf.query.algebra.Filter; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.base.Optional; - -/** - * Searches a SPARQL query for {@link Filter}s. - */ -@DefaultAnnotation(NonNull.class) -class FilterFinder { - - /** - * Search a SPARQL query for the {@link Filter} that appears at the - * {@code indexWithinQuery}'th time within the query. - * <p> - * The top most filter within the query will be at index 0, the next filter - * encountered will be at index 1, ... and the last index that is encountered - * will be at index <i>n</i>. - * - * @param sparql - The SPARQL query that to parse. (not null) - * @param indexWithinQuery - The index of the filter to fetch. (not null) - * @return The filter that was found within the query at the specified index; - * otherwise absent. - * @throws Exception Thrown when the query could not be parsed or iterated over. - */ - public Optional<Filter> findFilter(final String sparql, final int indexWithinQuery) throws Exception { - checkNotNull(sparql); - checkArgument(indexWithinQuery >= 0); - - // When a filter is encountered for the requested index, store it in atomic reference and quit searching. - final AtomicReference<Filter> filterRef = new AtomicReference<>(); - final QueryModelVisitorBase<RuntimeException> filterFinder = new QueryModelVisitorBase<RuntimeException>() { - private int i = 0; - @Override - public void meet(final Filter filter) { - // Store and stop searching. - if(i == indexWithinQuery) { - filterRef.set(filter); - return; - } - - // Continue to the next filter. - i++; - super.meet(filter); - } - }; - - // Parse the query and find the filter. - final SPARQLParser parser = new SPARQLParser(); - final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - parsedQuery.getTupleExpr().visit(filterFinder); - return Optional.fromNullable(filterRef.get()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 42ec686..1c99051 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -26,9 +26,11 @@ import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; @@ -62,11 +64,6 @@ public class FilterResultUpdater { private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); /** - * A utility class used to search SPARQL queries for Filters. - */ - private static final FilterFinder filterFinder = new FilterFinder(); - - /** * Is used to evaluate the conditions of a {@link Filter}. */ private static final EvaluationStrategyImpl evaluator = new EvaluationStrategyImpl( @@ -111,12 +108,11 @@ public class FilterResultUpdater { "Binding Set:\n" + childBindingSet + "\n"); // Parse the original query and find the Filter that represents filterId. - final String sparql = filterMetadata.getOriginalSparql(); - final int indexWithinQuery = filterMetadata.getFilterIndexWithinSparql(); - final Optional<Filter> filter = filterFinder.findFilter(sparql, indexWithinQuery); + final String sparql = filterMetadata.getFilterSparql(); + Filter filter = FilterSerializer.deserialize(sparql); // Evaluate whether the child BindingSet satisfies the filter's condition. - final ValueExpr condition = filter.get().getCondition(); + final ValueExpr condition = filter.getCondition(); if (isTrue(condition, childBindingSet)) { // Create the Filter's binding set from the child's. final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index f9d14b5..2084907 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -18,6 +18,8 @@ */ package org.apache.rya.indexing.pcj.fluo.app; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; + public class IncrementalUpdateConstants { // String constants used to create more easily parsed patterns. @@ -34,6 +36,9 @@ public class IncrementalUpdateConstants { public static final String AGGREGATION_PREFIX = "AGGREGATION"; public static final String QUERY_PREFIX = "QUERY"; public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; + public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; + + public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index 2cb5a54..9b65b34 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -43,6 +43,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java index b829b7e..b8fc2d9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -25,6 +25,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FI import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; import java.util.List; @@ -38,6 +39,7 @@ import com.google.common.base.Optional; * Represents the different types of nodes that a Query may have. */ public enum NodeType { + PERIODIC_QUERY(QueryNodeMetadataColumns.PERIODIC_QUERY_COLUMNS, FluoQueryColumns.PERIODIC_QUERY_BINDING_SET), FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET), JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), @@ -101,6 +103,8 @@ public enum NodeType { type = AGGREGATION; } else if(nodeId.startsWith(CONSTRUCT_PREFIX)) { type = CONSTRUCT; + } else if(nodeId.startsWith(PERIODIC_QUERY_PREFIX)) { + type = PERIODIC_QUERY; } return Optional.fromNullable(type); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java new file mode 100644 index 0000000..ae4912b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.Binding; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +/** + * This class adds the appropriate BinId Binding to each BindingSet that it processes. The BinIds + * are used to determine which period a BindingSet (with a temporal Binding) falls into so that + * a user can receive periodic updates for a registered query. + * + */ +public class PeriodicQueryUpdater { + + private static final Logger log = Logger.getLogger(PeriodicQueryUpdater.class); + private static final ValueFactory vf = new ValueFactoryImpl(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + + /** + * Uses the {@link PeriodicQueryMetadata} to create a collection of binned BindingSets + * that are added to Fluo. Each binned BindingSet is the original BindingSet with an additional + * Binding that contains the periodic bin id of the BindingSet. + * @param tx - Fluo Transaction + * @param bs - VisibilityBindingSet that will be binned + * @param metadata - PeriodicQueryMetadata used to bin BindingSets + * @throws Exception + */ + public void updatePeriodicBinResults(TransactionBase tx, VisibilityBindingSet bs, PeriodicQueryMetadata metadata) throws Exception { + Set<Long> binIds = getBinEndTimes(metadata, bs); + for(Long id: binIds) { + //create binding set value bytes + QueryBindingSet binnedBs = new QueryBindingSet(bs); + binnedBs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(id)); + VisibilityBindingSet visibilityBindingSet = new VisibilityBindingSet(binnedBs, bs.getVisibility()); + Bytes periodicBsBytes = BS_SERDE.serialize(visibilityBindingSet); + + //create row + final Bytes resultRow = RowKeyUtil.makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), binnedBs); + Column col = FluoQueryColumns.PERIODIC_QUERY_BINDING_SET; + tx.set(resultRow, col, periodicBsBytes); + } + } + + /** + * This method returns the end times of all period windows containing the time contained in + * the BindingSet. + * + * @param metadata + * @return Set of period bin end times + */ + private Set<Long> getBinEndTimes(PeriodicQueryMetadata metadata, VisibilityBindingSet bs) { + Set<Long> binIds = new HashSet<>(); + try { + String timeVar = metadata.getTemporalVariable(); + Value value = bs.getBinding(timeVar).getValue(); + Literal temporalLiteral = (Literal) value; + long eventDateTime = temporalLiteral.calendarValue().toGregorianCalendar().getTimeInMillis(); + return getEndTimes(eventDateTime, metadata.getWindowSize(), metadata.getPeriod()); + } catch (Exception e) { + log.trace("Unable to extract the entity time from BindingSet: " + bs); + } + return binIds; + } + + private long getRightBinEndPoint(long eventDateTime, long periodDuration) { + return (eventDateTime / periodDuration + 1) * periodDuration; + } + + private long getLeftBinEndPoint(long eventTime, long periodDuration) { + return (eventTime / periodDuration) * periodDuration; + } + + /** + * Using the smallest period end time, this method also creates all other period end times + * that occur within one windowSize of the eventDateTime. + * @param eventDateTime + * @param startTime + * @param windowDuration + * @param periodDuration + * @return Set of period bin end times + */ + private Set<Long> getEndTimes(long eventDateTime, long windowDuration, long periodDuration) { + Set<Long> binIds = new HashSet<>(); + long rightEventBin = getRightBinEndPoint(eventDateTime, periodDuration); + //get the bin left of the current moment for comparison + long currentBin = getLeftBinEndPoint(System.currentTimeMillis(), periodDuration); + + if(currentBin >= rightEventBin) { + long numBins = (windowDuration -(currentBin - rightEventBin))/periodDuration; + for(int i = 0; i < numBins; i++) { + binIds.add(currentBin + i*periodDuration); + } + } else { + //this corresponds to a future event that is inserted into the system + long numBins = windowDuration/periodDuration; + for(int i = 0; i < numBins; i++) { + binIds.add(rightEventBin + i*periodDuration); + } + } + + return binIds; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index ba82726..44fc9bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -31,6 +31,7 @@ import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.query.BindingSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -88,7 +89,7 @@ public class QueryResultUpdater { } // Create the Binding Set that goes in the Node Value. It does contain visibilities. - final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); + final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet,childBindingSet.getVisibility())); log.trace( "Transaction ID: " + tx.getStartTimestamp() + "\n" + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java deleted file mode 100644 index 34439e4..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDe.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app; - -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.fluo.api.data.Bytes; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects. - */ -@DefaultAnnotation(NonNull.class) -public class VisibilityBindingSetSerDe { - - /** - * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object. - * - * @param bindingSet - The binding set that will be serialized. (not null) - * @return The serialized object. - * @throws Exception A problem was encountered while serializing the object. - */ - public Bytes serialize(final VisibilityBindingSet bindingSet) throws Exception { - requireNonNull(bindingSet); - - final ByteArrayOutputStream boas = new ByteArrayOutputStream(); - try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) { - oos.writeObject(bindingSet); - } - - return Bytes.of(boas.toByteArray()); - } - - /** - * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object. - * - * @param bytes - The bytes that will be deserialized. (not null) - * @return The deserialized object. - * @throws Exception A problem was encountered while deserializing the object. - */ - public VisibilityBindingSet deserialize(final Bytes bytes) throws Exception { - requireNonNull(bytes); - - try(final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toArray()))) { - final Object o = ois.readObject(); - if(o instanceof VisibilityBindingSet) { - return (VisibilityBindingSet) o; - } else { - throw new Exception("Deserialized Object is not a VisibilityBindingSet. Was: " + o.getClass()); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java new file mode 100644 index 0000000..db33d3b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +/** + * This class provides common functionality for implementations of {@link BatchBindingSetUpdater}. + * + */ +public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetUpdater { + + /** + * Updates the Span to create a new {@link BatchInformation} object to be fed to the + * {@link BatchObserver}. This message is called in the event that the BatchBindingSetUpdater + * reaches the batch size before processing all entries relevant to its Span. + * @param newStart - new start to the Span + * @param oldSpan - old Span to be updated with newStart + * @return - updated Span used with an updated BatchInformation object to complete the batch task + */ + public Span getNewSpan(RowColumn newStart, Span oldSpan) { + return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive()); + } + + /** + * Cleans up old batch job. This method is meant to be called by any overriding method + * to clean up old batch tasks. + */ + @Override + public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { + tx.delete(row, FluoQueryColumns.BATCH_COLUMN); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java new file mode 100644 index 0000000..498dd85 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +import jline.internal.Preconditions; + +/** + * Abstract class for generating span based notifications. A spanned notification + * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. + * + */ +public abstract class AbstractSpanBatchInformation extends BasicBatchInformation { + + private Span span; + + /** + * Create AbstractBatchInformation + * @param batchSize - size of batch to be processed + * @param task - type of task processed (Add, Delete, Udpate) + * @param column - Cpolumn that Span notification is applied + * @param span - span used to indicate where processing should begin + */ + public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { + super(batchSize, task, column); + this.span = Preconditions.checkNotNull(span); + } + + public AbstractSpanBatchInformation(Task task, Column column, Span span) { + this(DEFAULT_BATCH_SIZE, task, column, span); + } + + /** + * @return Span that batch Task will be applied to + */ + public Span getSpan() { + return span; + } + + /** + * Sets span to which batch Task will be applied + * @param span + */ + public void setSpan(Span span) { + this.span = span; + } + + @Override + public String toString() { + return new StringBuilder() + .append("Span Batch Information {\n") + .append(" Span: " + span + "\n") + .append(" Batch Size: " + super.getBatchSize() + "\n") + .append(" Task: " + super.getTask() + "\n") + .append(" Column: " + super.getColumn() + "\n") + .append("}") + .toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof AbstractSpanBatchInformation)) { + return false; + } + + AbstractSpanBatchInformation batch = (AbstractSpanBatchInformation) other; + return (super.getBatchSize() == batch.getBatchSize()) && Objects.equals(super.getColumn(), batch.getColumn()) && Objects.equals(this.span, batch.span) + && Objects.equals(super.getTask(), batch.getTask()); + } + + @Override + public int hashCode() { + return Objects.hash(super.getBatchSize(), span, super.getColumn(), super.getTask()); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java new file mode 100644 index 0000000..288ed6e --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java @@ -0,0 +1,81 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; + +import com.google.common.base.Preconditions; + +/** + * This class contains all of the common info contained in other implementations + * of BatchInformation. + * + */ +public abstract class BasicBatchInformation implements BatchInformation { + + private int batchSize; + private Task task; + private Column column; + + /** + * Create BasicBatchInformation object + * @param batchSize - size of batch to be processed + * @param task - task to be processed + * @param column - Column in which data is proessed + */ + public BasicBatchInformation(int batchSize, Task task, Column column ) { + this.task = Preconditions.checkNotNull(task); + this.column = Preconditions.checkNotNull(column); + Preconditions.checkArgument(batchSize > 0); + this.batchSize = batchSize; + } + + /** + * Creates a BasicBatchInformation + * @param task + */ + public BasicBatchInformation(Task task, Column column) { + Preconditions.checkNotNull(task); + Preconditions.checkNotNull(column); + this.task = task; + this.column = column; + this.batchSize = DEFAULT_BATCH_SIZE; + } + + /** + * @return - size of batch that tasks are performed in + */ + public int getBatchSize() { + return batchSize; + } + + /** + * @return - type of Task performed (Add, Delete, Update) + */ + public Task getTask() { + return task; + } + + /** + * @return - Column in which Task will be performed + */ + public Column getColumn() { + return column; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java new file mode 100644 index 0000000..2076d2d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchBindingSetUpdater.java @@ -0,0 +1,43 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +/** + * Interface for applying batch updates to the Fluo table based on the provided {@link BatchInformation}. + * This updater is used by the {@link BatchObserver} to apply batch updates to overcome the restriction + * that all transactions are processed in memory. This allows Observers process potentially large + * tasks that cannot fit into memory in a piece-wise, batch fashion. + */ +public interface BatchBindingSetUpdater { + + /** + * Processes the {@link BatchInformation} object. The BatchInformation will + * typically include a Task (either Add, Update, or Delete), along with information + * about the starting point to begin processing data. + * @param tx - Fluo Transaction + * @param row - contains the ID of the Fluo node to be processed + * @param batch - contains info about which cells for the Fluo query result node + * should be processed + * @throws Exception + */ + public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java new file mode 100644 index 0000000..7b23ee7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformation.java @@ -0,0 +1,57 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; + +/** + * Interface for submitting batch Fluo tasks to be processed by the + * {@link BatchObserver}. The BatchObserver applies batch updates to overcome + * the restriction that all Fluo transactions are processed in memory. This + * allows the Rya Fluo application to process large tasks that cannot fit into + * memory in a piece-wise, batch fashion. + */ +public interface BatchInformation { + + public static enum Task {Add, Delete, Update} + public static int DEFAULT_BATCH_SIZE = 5000; + + /** + * @return batchsize of task + */ + public int getBatchSize(); + + /** + * + * @return Task to be performed + */ + public Task getTask(); + + /** + * + * @return Column that task will be performed on + */ + public Column getColumn(); + + /** + * + * @return BatchBindingSetUpdater used to process this Batch Task + */ + public BatchBindingSetUpdater getBatchUpdater(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java new file mode 100644 index 0000000..f9ed658 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchInformationDAO.java @@ -0,0 +1,59 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.batch.serializer.BatchInformationSerializer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +/** + * Class used for reading and writing {@link BatchInformation}. + * + */ +public class BatchInformationDAO { + + /** + * Adds BatchInformation to the {@link FluoQueryColumns#BATCH_COLUMN}. + * @param tx - Fluo Transaction + * @param nodeId - query node that batch task will be performed on + * @param batch - BatchInformation to be processed + */ + public static void addBatch(TransactionBase tx, String nodeId, BatchInformation batch) { + Bytes row = BatchRowKeyUtil.getRow(nodeId); + tx.set(row, FluoQueryColumns.BATCH_COLUMN, Bytes.of(BatchInformationSerializer.toBytes(batch))); + } + + /** + * Retrieve BatchInformation + * @param tx - Fluo transaction + * @param row - row that contains batch information - this is the query id that batch task will be performed on + * @return Optional contained the BatchInformation if it is there + */ + public static Optional<BatchInformation> getBatchInformation(TransactionBase tx, Bytes row) { + Bytes val = tx.get(row, FluoQueryColumns.BATCH_COLUMN); + if(val != null) { + return BatchInformationSerializer.fromBytes(val.toArray()); + } else { + return Optional.empty(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java new file mode 100644 index 0000000..6194236 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BatchObserver.java @@ -0,0 +1,63 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +/** + * BatchObserver processes tasks that need to be broken into batches. Entries + * stored stored in this {@link FluoQueryColumns#BATCH_COLUMN} are of the form + * Row: nodeId, Value: BatchInformation. The nodeId indicates the node that the + * batch operation will be performed on. All batch operations are performed on + * the bindingSet column for the {@link NodeType} corresponding to the given + * nodeId. For example, if the nodeId indicated that the NodeType was + * StatementPattern, then the batch operation would be performed on + * {@link FluoQueryColumns#STATEMENT_PATTERN_BINDING_SET}. This Observer applies + * batch updates to overcome the restriction that all Fluo transactions are processed + * in memory. This allows the Rya Fluo application to process large tasks that cannot + * fit into memory in a piece-wise, batch fashion. + */ +public class BatchObserver extends AbstractObserver { + + /** + * Processes the BatchInformation objects when they're written to the Batch column + * @param tx - Fluo transaction + * @param row - row that contains {@link BatchInformation} + * @param col - column that contains BatchInformation + */ + @Override + public void process(TransactionBase tx, Bytes row, Column col) throws Exception { + Optional<BatchInformation> batchInfo = BatchInformationDAO.getBatchInformation(tx, row); + if(batchInfo.isPresent()) { + batchInfo.get().getBatchUpdater().processBatch(tx, row, batchInfo.get()); + } + } + + @Override + public ObservedColumn getObservedColumn() { + return new ObservedColumn(FluoQueryColumns.BATCH_COLUMN, NotificationType.STRONG); + } + +}
