RYA-406. Closes #251.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/62de7c5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/62de7c5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/62de7c5d Branch: refs/heads/master Commit: 62de7c5d16ac6624228fd2c03f78b09845c1e74b Parents: 6d2bfcb Author: Caleb Meier <[email protected]> Authored: Mon Nov 6 12:23:57 2017 -0800 Committer: Caleb Meier <[email protected]> Committed: Tue Dec 5 11:47:45 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/RyaClientExample.java | 3 +- .../notification/pruner/FluoBinPruner.java | 9 +- .../rya.manual/src/site/markdown/pcj-updater.md | 186 +++++++++++++++++++ .../indexing/pcj/fluo/api/DeleteFluoPcj.java | 59 +++--- .../indexing/pcj/fluo/api/GetQueryReport.java | 21 ++- .../indexing/pcj/fluo/api/InsertTriples.java | 26 ++- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 40 ++-- .../pcj/fluo/app/AbstractNodeUpdater.java | 45 +++++ .../pcj/fluo/app/AggregationResultUpdater.java | 5 +- .../indexing/pcj/fluo/app/BindingSetRow.java | 41 +++- .../fluo/app/ConstructQueryResultUpdater.java | 13 +- .../pcj/fluo/app/FilterResultUpdater.java | 5 +- .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 11 +- .../fluo/app/IncrementalUpdateConstants.java | 21 ++- .../pcj/fluo/app/JoinResultUpdater.java | 62 +++---- .../pcj/fluo/app/PeriodicQueryUpdater.java | 21 +-- .../pcj/fluo/app/ProjectionResultUpdater.java | 10 +- .../pcj/fluo/app/QueryResultUpdater.java | 5 +- .../batch/AbstractBatchBindingSetUpdater.java | 6 +- .../app/batch/AbstractSpanBatchInformation.java | 8 +- .../app/batch/JoinBatchBindingSetUpdater.java | 16 +- .../app/batch/SpanBatchBindingSetUpdater.java | 60 ++++-- .../app/batch/SpanBatchDeleteInformation.java | 78 +++++++- .../SpanBatchInformationTypeAdapter.java | 17 +- .../app/export/rya/RyaSubGraphExporter.java | 13 +- .../fluo/app/observers/AggregationObserver.java | 9 +- .../observers/ConstructQueryResultObserver.java | 12 +- .../pcj/fluo/app/observers/FilterObserver.java | 3 +- .../pcj/fluo/app/observers/JoinObserver.java | 3 +- .../app/observers/PeriodicQueryObserver.java | 5 +- .../fluo/app/observers/ProjectionObserver.java | 3 +- .../fluo/app/observers/QueryResultObserver.java | 8 +- .../app/observers/StatementPatternObserver.java | 5 +- .../pcj/fluo/app/observers/TripleObserver.java | 49 ++--- .../pcj/fluo/app/query/FluoQueryColumns.java | 27 ++- .../fluo/app/query/FluoQueryMetadataCache.java | 181 +++++++++++------- .../fluo/app/query/FluoQueryMetadataDAO.java | 5 + .../fluo/app/query/MetadataCacheSupplier.java | 70 +++++-- .../fluo/app/query/StatementPatternIdCache.java | 89 +++++++++ .../query/StatementPatternIdCacheSupplier.java | 74 ++++++++ .../app/query/StatementPatternIdManager.java | 90 +++++++++ .../app/util/BindingHashShardingFunction.java | 178 ++++++++++++++++++ .../pcj/fluo/app/util/TriplePrefixUtils.java | 62 +++++++ .../app/query/FluoQueryMetadataCacheTest.java | 18 ++ .../app/query/StatementPatternIdCacheTest.java | 55 ++++++ .../util/BindingHashShardingFunctionTest.java | 66 +++++++ .../fluo/app/util/TriplePrefixUtilsTest.java | 37 ++++ .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 29 +-- .../indexing/pcj/fluo/integration/BatchIT.java | 129 +++++++------ .../pcj/fluo/integration/CreateDeleteIT.java | 20 +- .../integration/CreateDeletePeriodicPCJ.java | 7 +- .../pcj/fluo/integration/FluoLatencyIT.java | 169 ----------------- .../indexing/pcj/fluo/integration/QueryIT.java | 32 ++-- .../integration/StatementPatternIdCacheIT.java | 82 ++++++++ .../src/test/resources/log4j.properties | 43 +++++ .../rya/pcj/fluo/test/base/FluoITBase.java | 41 ++-- .../pcj/fluo/test/base/KafkaExportITBase.java | 13 +- .../pcj/functions/geo/GeoFunctionsIT.java | 14 +- 58 files changed, 1778 insertions(+), 631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/indexingExample/src/main/java/RyaClientExample.java ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java index 3f42f1d..278f214 100644 --- a/extras/indexingExample/src/main/java/RyaClientExample.java +++ b/extras/indexingExample/src/main/java/RyaClientExample.java @@ -44,6 +44,7 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.accumulo.AccumuloIndexingConfiguration; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; @@ -75,7 +76,6 @@ public class RyaClientExample { private static final Logger log = Logger.getLogger(RyaClientExample.class); public static void main(final String[] args) throws Exception { - setupLogging(); final String accumuloUsername = "root"; final String accumuloPassword = "password"; @@ -241,6 +241,7 @@ public class RyaClientExample { // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverSpecification> observers = new ArrayList<>(); observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(BatchObserver.class.getName())); observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java index 0562180..ea08af5 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java @@ -23,12 +23,14 @@ import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ import com.google.common.base.Optional; public class FluoBinPruner implements BinPruner { private static final Logger log = LoggerFactory.getLogger(FluoBinPruner.class); + private static final ValueFactory vf = new ValueFactoryImpl(); private final FluoClient client; public FluoBinPruner(final FluoClient client) { @@ -67,9 +70,9 @@ public class FluoBinPruner implements BinPruner { throw new RuntimeException(); } final Column batchInfoColumn = type.get().getResultColumn(); - final String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; + final Bytes batchInfoSpanPrefix = BindingHashShardingFunction.getShardedScanPrefix(id, vf.createLiteral(bin)); final SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) - .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build(); + .setSpan(Span.prefix(batchInfoSpanPrefix)).build(); BatchInformationDAO.addBatch(tx, id, batchInfo); tx.commit(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.manual/src/site/markdown/pcj-updater.md ---------------------------------------------------------------------- diff --git a/extras/rya.manual/src/site/markdown/pcj-updater.md b/extras/rya.manual/src/site/markdown/pcj-updater.md index 11cb560..157bb13 100644 --- a/extras/rya.manual/src/site/markdown/pcj-updater.md +++ b/extras/rya.manual/src/site/markdown/pcj-updater.md @@ -508,6 +508,192 @@ will fail. If that occurs, look up the YARN Application-Id in the YARN UI, or with the command `yarn application -list` and then kill it with a command similar to: `yarn application -kill application_1503402439867_0009`. +## Performance Optimizations +There are a number of optimizations that can boost the performance of the Rya PCJ Updater. The main +bottleneck that will prevent an instance of the PCJ Updater from scaling is the load that the application +places on the Accumulo Tablet Servers. In an effort to mitigate this, there are a number of things that can be done to +lighten the scan load on each Tablet Server and cut down on the number of scans overall. + +#### Metadata Caching +The PCJ Updater uses metadata associated with each query node to route and process intermediate query results. +New queries can be dynamically added to and deleted from the Rya PCJ Updater, but for the most part this data +is static and can be cached. So the PCJ Updater aggressively caches whatever metadata it can to avoid lookups. In addition, +each time the Updater processes new statements, it must match the new triples with StatementPatterns registered with the +PCJ Updater. The ids of these StatementPatterns are also cached to avoid costly scans for new StatementPatterns. All +metadata caches are active and utilized by default. + +#### Load Balancing and Sharding +Another important optimization that can drastically boost performance is sharding. Sharding ensures that +the processing and scanning load is equally distributed among all Tablet Servers. By default, the PCJ Updater +shards its rows. However, to take advantage of this optimization, the user must pre-split the Fluo table after +initializing the application. To do this, add the following properties to the fluo.properties file for the application +before initializing: + +``` +# RowHasher Split Properties +# ------------------- +fluo.app.recipes.optimizations.SP=org.apache.fluo.recipes.core.data.RowHasher$Optimizer +fluo.app.recipes.optimizations.J=org.apache.fluo.recipes.core.data.RowHasher$Optimizer +fluo.app.recipes.optimizations.A=org.apache.fluo.recipes.core.data.RowHasher$Optimizer +fluo.app.recipes.optimizations.PR=org.apache.fluo.recipes.core.data.RowHasher$Optimizer +fluo.app.recipes.optimizations.Q=org.apache.fluo.recipes.core.data.RowHasher$Optimizer + +fluo.app.recipes.rowHasher.SP.numTablets=8 +fluo.app.recipes.rowHasher.J.numTablets=8 +fluo.app.recipes.rowHasher.A.numTablets=8 +fluo.app.recipes.rowHasher.PR.numTablets=8 +fluo.app.recipes.rowHasher.Q.numTablets=8 + +``` + +The above properties are used by the Fluo RowHasher recipe, which splits the Fluo +table along each specified prefix. The properties above indicate that the Fluo table +will be split across 8 tablets along each of the prefixes SP, J, A, PR, and Q. Each of these +prefixes correspond to prefixes of distinct result ranges stored in the Rya PCJ Updater. +The choice of 8 tablets is arbitrary and should be based on available resources. After +adding the above properties and initializing the application, execute the following command: + +``` +./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable + +``` + +This command generates and applies the splits indicated by above properties. See +the Fluo [row hash prefix recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/row-hasher/) +for more details. + +#### Compaction Strategies for Garbage Collecting +The PCJ Updater application retains processed notifications and triples that +are marked for deletion until a minor or major compaction runs and triggers the +Fluo Garbage Collection iterator. Because new Triples are processed by the TripleObserver +and then immediately marked as deleted, it's quite possible that a large number of "deleted" triples +could build up before they are actually removed from the table. Similarly, +old notifications that have already been processed and marked as deleted can pile up as well. As these entries +build up in the Fluo table, Tablet Servers have to scan over these entries when the +Fluo NotificationIterator is run, creating extra work for the Tablet Servers. To count the number +of "DELETED" notifications and triples that are in the table, run the following commands: + +``` + #counts number of old notifications + ./bin/fluo scan <app_name> --raw -c ntfy | grep -c 'DELETE' + #counts number of deleted triples + ./bin/fluo scan <app_name> --raw -c triples | grep -c 'DELETE' +``` + +It's good practice to monitor how these quantities grow after starting the application to get +a sense of whether or not Accumulo compactions are being executed frequently enough. One possible +optimization to increase the compaction rate is to adjust the compaction ratio through the Accumulo shell. +After initializing the PCJ Updater application, execute the following command in the Accumulo shell: + +``` +config -t <app_table_name> -s table.compaction.major.ratio=1.0 +``` + +This sets the major compaction ratio of the Fluo table to 1.0, where the lower the ratio, the more frequently +major compactions will occur. Another approach is to compact on a specified Range. Fluo supports +periodically compacting on specified ranges. To do this, add any of the following hex ranges to the +fluo.properties file before initializing the PCJ Updater. + +``` +#Rya PCJ Updater Compaction ranges +#Compact all triples +fluo.app.recipes.transientRange.triples=543C3C3A3E3E:543C3C3A3E3EFF +#Compact all statement pattern results +fluo.app.recipes.transientRange.statementPattern=53503C3C3A3E3E:53503C3C3A3E3EFF +#Compact all join results +fluo.app.recipes.transientRange.join=4A3C3C3A3E3E:4A3C3C3A3E3EFF +#Compact all aggregation results +fluo.app.recipes.transientRange.aggregation=413C3C3A3E3E:413C3C3A3E3EFF +#Compact all projection results +fluo.app.recipes.transientRange.projection=50523C3C3A3E3E:50523C3C3A3E3EFF +#Compact full table +fluo.app.recipes.transientRange.fullTable=:FF +``` +To apply the above transient range properties, execute the following Fluo command: + +``` +./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier> +``` + +The above command executes a compaction over each transient range that is included in the fluo.properties file. +These compactions will run indefinitely and execute every time the compaction period (in ms) elapses. The multiplier +is an optional parameter that indicates how much the periodic compaction script will throttle compactions if they +begin taking too long. See the Fluo [transient data recipe](https://fluo.apache.org/docs/fluo-recipes/1.1.0-incubating/transient/) +for more information. + +If running compactions proves to be extremely costly and begins to affect application performance, it's best to +only do a range compaction on the triples. This will clean up old, deleted triples and any notifications related to triples. +If the Tablet Servers can handle the additional load, try adding additional ranges to clean up +old data. Note that when the full table range is added, there is no need to include any of the other +ranges. + +#### Cheatsheet for Applying the Optimizations +To apply the above optimizations, + +1. Stop the Rya PCJ Updater app if it is running by +executing the following command +``` +./bin/fluo stop <app_name> +``` + 2. Once the application is stopped, add any optimization related properties to the fluo.properties file. + These properties include the row sharding properties and the range compaction properties outlined + above. Note that it is best to add all of the row shard properties to ensure an even data distribution + among all of the Tablet Servers, and start off by adding only the triple transient range property for Range compactions. + + 3. Initialize the Rya PCJ Updater application by executing the following command in Fluo + ``` + ./bin/fluo init <app_name> + ``` + 4. Apply the row shard splits by executing + ``` + ./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable +``` +5. Check the PCJ Updater table in the Accumulo UI to ensure that the table has the +correct number of Tablet Servers + +6. Start the application by executing the following command +``` +./bin/fluo start <app_name> +``` +7. Apply the transient range compactions by executing the following command +``` +./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier> +``` + +#### Monitoring Performance +Once the application is running, there are a number ways to assess its performance. The +primary method is to track the number of outstanding notifications that are queued and waiting +to be processed. This can be done by executing the Fluo wait command + +``` +./bin/fluo wait <app_name> +``` + +When this script executes, it polls Fluo every ten seconds to determine how many unprocessed notifications +are queued up. If this number grows over time then the application needs more workers to handle the ingest load. + +Before deploying more workers, first verify that the Tablet Servers can handle an increased scan load by checking +the Accumulo UI to see if the Tablet Servers have any queued scans for the PCJ Updater table. If there are a large +number of queued scans for the table, adding more workers might not be possible. It may be necessary to lower the +ingest rate. If the Tablet Servers are keeping up, deploy additional workers by stopping the application, updating +the fluo.properties to use more workers, and re-initializing and starting the application. + +Other ways to assess the performance of the application is to monitor the scan rate through the Accumulo UI. If the scan +rate is staying approximately constant over time (or increasing very slowly), then the application is performing as expected. +In general, the scan rate increases because + +1. The number of intermediate results increases over time +2. The number of "deleted" notifications that need to flushed increases over time +3. The number of "deleted" triples that need to be flushed increases over time + +The first item is unavoidable if there is no age off policy for the PCJ Updater application (which is currently the case). +However, 2 and 3 are avoidable by applying the range compaction optimizations discussed above. So it's important to monitor the +scan rate (and the number of old triples and notifications as discussed above) to assess the health of the application. + +Finally an additional item to monitor is which iterators are running in the Fluo table. This can be done by regularly running the listscans +command in the Accumulo shell. This gives a sense of how the Tablet Servers are being used. It helps determine whether they are +spending most of their time finding new notifications (internal Fluo work), or issuing scans that are specific to Rya PCJ Updater Observers. + [Apache Fluo]: https://fluo.apache.org/ [Apache Fluo 1.0.0-incubating Documentation]: https://fluo.apache.org/docs/fluo/1.0.0-incubating/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java index 0d97b2f..e0123be 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java @@ -21,19 +21,22 @@ package org.apache.rya.indexing.pcj.fluo.api; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdManager; import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.openrdf.query.BindingSet; @@ -79,7 +82,8 @@ public class DeleteFluoPcj { * Index. (not null) * @param pcjId - The PCJ ID for the query that will removed from the Fluo * application. (not null) - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException - thrown when Fluo app is unable to read FluoQuery associated + * with given pcjId. */ public void deletePcj(final FluoClient client, final String pcjId) throws UnsupportedQueryException { requireNonNull(client); @@ -104,7 +108,8 @@ public class DeleteFluoPcj { * @param tx - Transaction of a given Fluo table. (not null) * @param pcjId - Id of query. (not null) * @return list of Node IDs associated with the query {@code pcjId}. - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException - thrown when Fluo app is unable to read FluoQuery associated + * with given pcjId. */ private List<String> getNodeIds(Transaction tx, String pcjId) throws UnsupportedQueryException { requireNonNull(tx); @@ -131,10 +136,18 @@ public class DeleteFluoPcj { requireNonNull(pcjId); try (final Transaction typeTx = tx) { + Set<String> spNodeIds = new HashSet<>(); + //remove metadata associated with each nodeId and store statement pattern nodeIds for (final String nodeId : nodeIds) { final NodeType type = NodeType.fromNodeId(nodeId).get(); + if(type == NodeType.STATEMENT_PATTERN) { + spNodeIds.add(nodeId); + } deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns()); } + //Use stored statement pattern nodeIds to update list of stored statement pattern nodeIds + //in Fluo table + StatementPatternIdManager.removeStatementPatternIds(typeTx, spNodeIds); typeTx.commit(); } } @@ -169,37 +182,11 @@ public class DeleteFluoPcj { final NodeType type = NodeType.fromNodeId(nodeId).get(); Transaction tx = client.newTransaction(); - while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) { - tx = client.newTransaction(); - } - } - - private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) { - requireNonNull(tx); - requireNonNull(nodeId); - requireNonNull(column); - - return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build(); - } - - private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) { - requireNonNull(tx); - requireNonNull(scanner); - requireNonNull(column); - - try (Transaction ntx = tx) { - int count = 0; - final Iterator<RowColumnValue> iter = scanner.iterator(); - while (iter.hasNext() && count < batchSize) { - final Bytes row = iter.next().getRow(); - count++; - tx.delete(row, column); - } - - final boolean hasNext = iter.hasNext(); - tx.commit(); - return hasNext; - } + Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix()); + SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setColumn(type.getResultColumn()) + .setSpan(Span.prefix(prefixBytes)).setBatchSize(batchSize).setNodeId(Optional.of(nodeId)).build(); + BatchInformationDAO.addBatch(tx, nodeId, batch); + tx.commit(); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java index ddbaaaf..4de4069 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java @@ -30,8 +30,11 @@ import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; @@ -63,7 +66,7 @@ public class GetQueryReport { * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) * @return A map from Query ID to QueryReport that holds a report for all of * the queries that are being managed within the fluo app. - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException */ public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) throws UnsupportedQueryException { checkNotNull(fluo); @@ -86,7 +89,7 @@ public class GetQueryReport { * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) * @param queryId - The ID of the query to fetch. (not null) * @return A report that was built for the query. - * @throws UnsupportedQueryException + * @throws UnsupportedQueryException */ public QueryReport getReport(final FluoClient fluo, final String queryId) throws UnsupportedQueryException { checkNotNull(fluo); @@ -132,14 +135,20 @@ public class GetQueryReport { checkNotNull(nodeId); checkNotNull(bindingSetColumn); + NodeType type = NodeType.fromNodeId(nodeId).get(); + Bytes prefixBytes = Bytes.of(type.getNodeTypePrefix()); + // Limit the scan to the binding set column and node id. - final RowScanner rows = sx.scanner().over(Span.prefix(nodeId)).fetch(bindingSetColumn).byRow().build(); + final RowScanner rows = sx.scanner().over(Span.prefix(prefixBytes)).fetch(bindingSetColumn).byRow().build(); BigInteger count = BigInteger.valueOf(0L); for (ColumnScanner columns : rows) { - count = count.add( BigInteger.ONE ); - } - + String row = BindingSetRow.makeFromShardedRow(prefixBytes, columns.getRow()).getNodeId(); + if (row.equals(nodeId)) { + count = count.add(BigInteger.ONE); + } + } + return count; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java index 1e86836..b784fbf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java @@ -19,24 +19,24 @@ package org.apache.rya.indexing.pcj.fluo.api; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.addTriplePrefixAndConvertToBytes; import java.util.Collection; import java.util.Collections; import java.util.Map; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; - -import com.google.common.base.Optional; - import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.triple.TripleRow; import org.apache.rya.api.resolver.triple.TripleRowResolverException; import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import com.google.common.base.Optional; /** * Insert a batch of Triples into. This will trigger observers that will update @@ -79,7 +79,7 @@ public class InsertTriples { try(Transaction tx = fluo.newTransaction()) { for(final RyaStatement triple : triples) { try { - tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(""))); + tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(""))); } catch (final TripleRowResolverException e) { log.error("Could not convert a Triple into the SPO format: " + triple); } @@ -88,7 +88,7 @@ public class InsertTriples { tx.commit(); } } - + /** * Inserts a triple into Fluo. * @@ -101,9 +101,9 @@ public class InsertTriples { insert(fluo, Collections.singleton(triple)); } - + /** - * Insert a batch of RyaStatements into Fluo. + * Insert a batch of RyaStatements into Fluo. * * @param fluo - A connection to the Fluo table that will be updated. (not null) * @param triples - The triples to insert. (not null) @@ -116,7 +116,7 @@ public class InsertTriples { for(final RyaStatement triple : triples) { Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); try { - tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); + tx.set(spoFormat(triple), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); } catch (final TripleRowResolverException e) { log.error("Could not convert a Triple into the SPO format: " + triple); } @@ -125,8 +125,6 @@ public class InsertTriples { tx.commit(); } } - - /** * Converts a triple into a byte[] holding the Rya SPO representation of it. @@ -135,10 +133,10 @@ public class InsertTriples { * @return The Rya SPO representation of the triple. * @throws TripleRowResolverException The triple could not be converted. */ - public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { + public static Bytes spoFormat(final RyaStatement triple) throws TripleRowResolverException { checkNotNull(triple); final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); - return spoRow.getRow(); + return addTriplePrefixAndConvertToBytes(spoRow.getRow()); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index fd624eb..4839c04 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -1,16 +1,22 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rya</groupId> @@ -60,6 +66,14 @@ <artifactId>fluo-api</artifactId> </dependency> <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-accumulo</artifactId> + </dependency> + <dependency> <groupId>org.openrdf.sesame</groupId> <artifactId>sesame-queryrender</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java new file mode 100644 index 0000000..62fc48c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * This class provides the common functionality for generating sharded row + * keys for any class that extends it. + * + */ +public abstract class AbstractNodeUpdater { + + /** + * Generates a sharded row key from the provided arguments for inserting new results + * into the Fluo table. The row key is of the form node prefix + shardId + nodeId + BindingSet values. + * The shardId is formed from a hash of the first Binding value as indicated by the VariableOrder. + * @param nodeId - query nodeId corresponding to new result + * @param varOrder - varOrder used to order Binding values for new row key + * @param bs - BindingSet that values will be taken from to form key + * @return - sharded row key formed from the provided arguments + */ + public Bytes makeRowKey(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) { + return BindingHashShardingFunction.addShard(nodeId, varOrder, bs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java index 2e41cb1..3acf3e9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java @@ -42,7 +42,6 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.openrdf.model.Literal; @@ -65,7 +64,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Updates the results of an Aggregate node when its child has added a new Binding Set to its results. */ @DefaultAnnotation(NonNull.class) -public class AggregationResultUpdater { +public class AggregationResultUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(AggregationResultUpdater.class); private static final AggregationStateSerDe AGG_STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); @@ -104,7 +103,7 @@ public class AggregationResultUpdater { // The Row ID for the Aggregation State that needs to be updated is defined by the Group By variables. final String aggregationNodeId = aggregationMetadata.getNodeId(); final VariableOrder groupByVars = aggregationMetadata.getGroupByVariableOrder(); - final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, groupByVars, childBindingSet); + final Bytes rowId = makeRowKey(aggregationNodeId, groupByVars, childBindingSet); // Load the old state from the bytes if one was found; otherwise initialize the state. final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET) ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java index 2e45ea6..9b7558f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java @@ -21,15 +21,17 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import java.util.Objects; + import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.Immutable; /** - * The values of an Accumulo Row ID for a row that stores a Binding set for - * a specific Node ID of a query. + * The values of an Accumulo Row ID for a row that stores a Binding set for a specific Node ID of a query. */ @Immutable @DefaultAnnotation(NonNull.class) @@ -77,4 +79,39 @@ public class BindingSetRow { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + + /** + * Creates a BindingSetRow from a sharded row entry, where the row is sharded according to + * {@link BindingHashShardingFunction}. + * + * @param prefixBytes - prefix of the node type that the row corresponds to (see prefixes in + * {@link IncrementalUpdateConstants}). + * @param row - row that BindingSetRow is created from + * @return - BindingSetRow object + */ + public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { + return make(BindingHashShardingFunction.removeHash(prefixBytes, row)); + } + + @Override + public String toString() { + return "NodeId: " + nodeId + "\n" + "BindingSet String: " + bindingSetString; + } + + @Override + public boolean equals(Object other) { + if(this == other) { return true;} + + if (other instanceof BindingSetRow) { + BindingSetRow row = (BindingSetRow) other; + return Objects.equals(nodeId, row.nodeId) && Objects.equals(bindingSetString, row.bindingSetString); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, bindingSetString); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java index 6642780..b50e862 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java @@ -28,22 +28,21 @@ import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; /** * This class creates results for the ConstructQuery. This class applies the {@link ConstructGraph} * associated with the Construct Query to generate a collection of {@link RyaStatement}s. These statements - * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column + * are then used to form a {@link RyaSubGraph} that is serialized and stored as a value in the Column * {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}. * */ -public class ConstructQueryResultUpdater { +public class ConstructQueryResultUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(ConstructQueryResultUpdater.class); private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe(); - + /** * Updates the Construct Query results by applying the {@link ConnstructGraph} to * create a {@link RyaSubGraph} and then writing the subgraph to {@link FluoQueryColumns#CONSTRUCT_STATEMENTS}. @@ -52,15 +51,15 @@ public class ConstructQueryResultUpdater { * @param metadata - metadata that the ConstructProjection is extracted from */ public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) { - + String nodeId = metadata.getNodeId(); VariableOrder varOrder = metadata.getVariableOrder(); Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS; ConstructGraph graph = metadata.getConstructGraph(); String parentId = metadata.getParentNodeId(); - + // Create the Row Key for the emitted binding set. It does not contain visibilities. - final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs); + final Bytes resultRow = makeRowKey(nodeId, varOrder, bs); // If this is a new binding set, then emit it. if(tx.get(resultRow, column) == null || varOrder.getVariableOrders().size() < bs.size()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 2cc9f77..c829156 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -57,7 +56,7 @@ import info.aduna.iteration.CloseableIteration; * Set to its results. */ @DefaultAnnotation(NonNull.class) -public class FilterResultUpdater { +public class FilterResultUpdater extends AbstractNodeUpdater { private static final Logger log = LoggerFactory.getLogger(FilterResultUpdater.class); @@ -114,7 +113,7 @@ public class FilterResultUpdater { // Create the Row Key for the emitted binding set. It does not contain visibilities. final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); - final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet); + final Bytes resultRow = makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet); // Serialize and emit BindingSet final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java index 602fd9d..e0483ab 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; +import static org.apache.rya.indexing.pcj.fluo.app.util.TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; @@ -41,8 +42,15 @@ public class IncUpdateDAO { private static final WholeRowTripleResolver tr = new WholeRowTripleResolver(); + /** + * Deserializes a triple stored in the Fluo table. + * @param row - serialized triple + * @return - triple deserialized as a RyaStatement + */ public static RyaStatement deserializeTriple(final Bytes row) { - final byte[] rowArray = row.toArray(); + + checkNotNull(row); + final byte[] rowArray = removeTriplePrefixAndConvertToByteArray(row); RyaStatement rs = null; try { @@ -55,6 +63,7 @@ public class IncUpdateDAO { } public static String getTripleString(final RyaStatement rs) { + checkNotNull(rs); final String subj = rs.getSubject().getData() + TYPE_DELIM + URI_TYPE; final String pred = rs.getPredicate().getData() + TYPE_DELIM + URI_TYPE; final String objData = rs.getObject().getData(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index 5405837..63b9715 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -30,15 +30,18 @@ public class IncrementalUpdateConstants { public static final String TYPE_DELIM = "<<~>>"; //to be used in construction of id for each node - public static final String SP_PREFIX = "STATEMENT_PATTERN"; - public static final String JOIN_PREFIX = "JOIN"; - public static final String FILTER_PREFIX = "FILTER"; - public static final String AGGREGATION_PREFIX = "AGGREGATION"; - public static final String QUERY_PREFIX = "QUERY"; - public static final String PROJECTION_PREFIX = "PROJECTION"; - public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; - public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; - + public static final String TRIPLE_PREFIX = "T"; + public static final String SP_PREFIX = "SP"; + public static final String JOIN_PREFIX = "J"; + public static final String FILTER_PREFIX = "F"; + public static final String AGGREGATION_PREFIX = "A"; + public static final String QUERY_PREFIX = "Q"; + public static final String PROJECTION_PREFIX = "PR"; + public static final String CONSTRUCT_PREFIX = "C"; + public static final String PERIODIC_QUERY_PREFIX = "PE"; + + public static final String STATEMENT_PATTERN_ID = "SP_ID"; + //binding name reserved for periodic bin id for periodic query results public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index fb3ee0c..bcce77d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -19,8 +19,6 @@ package org.apache.rya.indexing.pcj.fluo.app; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import java.util.ArrayList; import java.util.HashSet; @@ -43,13 +41,12 @@ import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; @@ -65,14 +62,11 @@ import edu.umd.cs.findbugs.annotations.NonNull; * new Binding Set to its results. */ @DefaultAnnotation(NonNull.class) -public class JoinResultUpdater { +public class JoinResultUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(JoinResultUpdater.class); - private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter(); - - private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); + private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache(); /** * Updates the results of a Join node when one of its children has added a @@ -130,7 +124,7 @@ public class JoinResultUpdater { Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, siblingId); Column siblingColumn = getScanColumnFamily(siblingId); Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize()); - + // Iterates over the resulting BindingSets from the join. final Iterator<VisibilityBindingSet> newJoinResults; if(emittingSide == Side.LEFT) { @@ -145,8 +139,8 @@ public class JoinResultUpdater { final VisibilityBindingSet newJoinResult = newJoinResults.next(); // Create the Row Key for the emitted binding set. It does not contain visibilities. - final Bytes resultRow = RowKeyUtil.makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult); - + final Bytes resultRow = makeRowKey(joinMetadata.getNodeId(), joinVarOrder, newJoinResult); + // Only insert the join Binding Set if it is new or BindingSet contains values not used in resultRow. if(tx.get(resultRow, FluoQueryColumns.JOIN_BINDING_SET) == null || joinVarOrder.getVariableOrders().size() < newJoinResult.size()) { // Create the Node Value. It does contain visibilities. @@ -159,7 +153,7 @@ public class JoinResultUpdater { tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, nodeValueBytes); } } - + // if batch limit met, there are additional entries to process // update the span and register updated batch job if (rowColumn.isPresent()) { @@ -183,18 +177,18 @@ public class JoinResultUpdater { public static enum Side { LEFT, RIGHT; } - - + + /** * Fetches batch to be processed by scanning over the Span specified by the * {@link JoinBatchInformation}. The number of results is less than or equal * to the batch size specified by the JoinBatchInformation. - * + * * @param tx - Fluo transaction in which batch operation is performed * @param siblingSpan - span of sibling to retrieve elements to join with * @param bsSet- set that batch results are added to * @return Set - containing results of sibling scan. - * @throws Exception + * @throws Exception */ private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int batchSize) throws Exception { @@ -222,7 +216,7 @@ public class JoinResultUpdater { return Optional.absent(); } } - + /** * Creates a Span for the sibling node to retrieve BindingSets to join with * @param tx @@ -231,29 +225,19 @@ public class JoinResultUpdater { * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update * @return Span to retrieve sibling node's BindingSets to form join results */ - private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) { + private Span getSpan(TransactionBase tx, final String childId, final VisibilityBindingSet childBindingSet, final String siblingId) { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); final List<String> commonVars = getCommonVars(childVarOrder, siblingVarOrder); - // Get the Binding strings - final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); - final String[] childBindingArray = childBindingSetString.split("\u0001"); - final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); - - // Create the prefix that will be used to scan for binding sets of the sibling node. - // This prefix includes the sibling Node ID and the common variable values from - // childBindingSet. - String siblingScanPrefix = ""; - for(int i = 0; i < commonVars.size(); i++) { - if(siblingScanPrefix.length() == 0) { - siblingScanPrefix = childBindingStrings[i]; - } else { - siblingScanPrefix += DELIM + childBindingStrings[i]; - } + Bytes siblingScanPrefix = null; + if(!commonVars.isEmpty()) { + siblingScanPrefix = makeRowKey(siblingId, new VariableOrder(commonVars), childBindingSet); + } else { + siblingScanPrefix = makeRowKey(siblingId, siblingVarOrder, childBindingSet); } - siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix; + return Span.prefix(siblingScanPrefix); } @@ -277,13 +261,13 @@ public class JoinResultUpdater { return removeBinIdFromVarOrder(queryDao.readFilterMetadata(tx, nodeId).getVariableOrder()); case JOIN: return removeBinIdFromVarOrder(queryDao.readJoinMetadata(tx, nodeId).getVariableOrder()); - case PROJECTION: + case PROJECTION: return removeBinIdFromVarOrder(queryDao.readProjectionMetadata(tx, nodeId).getVariableOrder()); default: throw new IllegalArgumentException("Could not figure out the variable order for node with ID: " + nodeId); } } - + private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) { List<String> varOrderList = varOrder.getVariableOrders(); if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) { @@ -311,7 +295,7 @@ public class JoinResultUpdater { final List<String> commonVars = new ArrayList<>(); - // Only need to iteratre through the shorted order's length. + // Only need to iterate through the shorted order's length. final Iterator<String> vars1It = vars1.iterator(); final Iterator<String> vars2It = vars2.iterator(); while(vars1It.hasNext() && vars2It.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java index cb331cf..9b9b3ae 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/PeriodicQueryUpdater.java @@ -27,7 +27,6 @@ import org.apache.fluo.api.data.Column; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; import org.openrdf.model.Literal; @@ -39,10 +38,10 @@ import org.openrdf.query.algebra.evaluation.QueryBindingSet; /** * This class adds the appropriate BinId Binding to each BindingSet that it processes. The BinIds * are used to determine which period a BindingSet (with a temporal Binding) falls into so that - * a user can receive periodic updates for a registered query. + * a user can receive periodic updates for a registered query. * */ -public class PeriodicQueryUpdater { +public class PeriodicQueryUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(PeriodicQueryUpdater.class); private static final ValueFactory vf = new ValueFactoryImpl(); @@ -65,9 +64,9 @@ public class PeriodicQueryUpdater { binnedBs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(id)); VisibilityBindingSet visibilityBindingSet = new VisibilityBindingSet(binnedBs, bs.getVisibility()); Bytes periodicBsBytes = BS_SERDE.serialize(visibilityBindingSet); - - //create row - final Bytes resultRow = RowKeyUtil.makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), binnedBs); + + //create row + final Bytes resultRow = makeRowKey(metadata.getNodeId(), metadata.getVariableOrder(), visibilityBindingSet); Column col = FluoQueryColumns.PERIODIC_QUERY_BINDING_SET; tx.set(resultRow, col, periodicBsBytes); } @@ -75,8 +74,8 @@ public class PeriodicQueryUpdater { /** * This method returns the end times of all period windows containing the time contained in - * the BindingSet. - * + * the BindingSet. + * * @param metadata * @return Set of period bin end times */ @@ -97,7 +96,7 @@ public class PeriodicQueryUpdater { private long getRightBinEndPoint(long eventDateTime, long periodDuration) { return (eventDateTime / periodDuration + 1) * periodDuration; } - + private long getLeftBinEndPoint(long eventTime, long periodDuration) { return (eventTime / periodDuration) * periodDuration; } @@ -116,7 +115,7 @@ public class PeriodicQueryUpdater { long rightEventBin = getRightBinEndPoint(eventDateTime, periodDuration); //get the bin left of the current moment for comparison long currentBin = getLeftBinEndPoint(System.currentTimeMillis(), periodDuration); - + if(currentBin >= rightEventBin) { long numBins = (windowDuration -(currentBin - rightEventBin))/periodDuration; for(int i = 0; i < numBins; i++) { @@ -132,6 +131,6 @@ public class PeriodicQueryUpdater { return binIds; } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java index f9d8257..e928e3d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ProjectionResultUpdater.java @@ -26,7 +26,6 @@ import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -40,7 +39,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * new Binding Set to its results. */ @DefaultAnnotation(NonNull.class) -public class ProjectionResultUpdater { +public class ProjectionResultUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(QueryResultUpdater.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); @@ -65,7 +64,7 @@ public class ProjectionResultUpdater { log.trace( "Transaction ID: " + tx.getStartTimestamp() + "\n" + "Node ID: " + projectionMetadata.getNodeId() + "\n" + - "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" + + "Parent Node ID: " + projectionMetadata.getParentNodeId() + "\n" + "Child Node ID: " + projectionMetadata.getChildNodeId() + "\n" + "Child Binding Set:\n" + childBindingSet + "\n"); @@ -73,12 +72,13 @@ public class ProjectionResultUpdater { final VariableOrder queryVarOrder = projectionMetadata.getVariableOrder(); final VariableOrder projectionVarOrder = projectionMetadata.getProjectedVars(); final BindingSet queryBindingSet = BindingSetUtil.keepBindings(projectionVarOrder, childBindingSet); + VisibilityBindingSet projectedBs = new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()); // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables. - Bytes resultRow = RowKeyUtil.makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, queryBindingSet); + Bytes resultRow = makeRowKey(projectionMetadata.getNodeId(), queryVarOrder, projectedBs); // Create the Binding Set that goes in the Node Value. It does contain visibilities. - final Bytes nodeValueBytes = BS_SERDE.serialize(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility())); + final Bytes nodeValueBytes = BS_SERDE.serialize(projectedBs); log.trace( "Transaction ID: " + tx.getStartTimestamp() + "\n" + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index 37d7256..6c383e2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -38,7 +37,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * new Binding Set to its results. */ @DefaultAnnotation(NonNull.class) -public class QueryResultUpdater { +public class QueryResultUpdater extends AbstractNodeUpdater { private static final Logger log = Logger.getLogger(QueryResultUpdater.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); @@ -70,7 +69,7 @@ public class QueryResultUpdater { final VariableOrder queryVarOrder = queryMetadata.getVariableOrder(); // Create the Row Key for the result. If the child node groups results, then the key must only contain the Group By variables. - final Bytes resultRow = RowKeyUtil.makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet); + final Bytes resultRow = makeRowKey(queryMetadata.getNodeId(), queryVarOrder, childBindingSet); // Create the Binding Set that goes in the Node Value. It does contain visibilities. final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java index 9584a10..c077310 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java @@ -23,6 +23,8 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; /** * This class provides common functionality for implementations of {@link BatchBindingSetUpdater}. @@ -30,6 +32,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; */ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetUpdater { + protected static final FluoQueryMetadataCache CACHE = MetadataCacheSupplier.getOrCreateCache(); + /** * Updates the Span to create a new {@link BatchInformation} object to be fed to the * {@link BatchObserver}. This message is called in the event that the BatchBindingSetUpdater @@ -41,7 +45,7 @@ public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetU public static Span getNewSpan(RowColumn newStart, Span oldSpan) { return new Span(newStart, oldSpan.isStartInclusive(), oldSpan.getEnd(), oldSpan.isEndInclusive()); } - + /** * Cleans up old batch job. This method is meant to be called by any overriding method * to clean up old batch tasks. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java index 2da3e39..e15fa8f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java @@ -36,7 +36,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation * Create AbstractBatchInformation * @param batchSize - size of batch to be processed * @param task - type of task processed (Add, Delete, Udpate) - * @param column - Cpolumn that Span notification is applied + * @param column - Column that Span notification is applied * @param span - span used to indicate where processing should begin */ public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { @@ -62,7 +62,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation public void setSpan(Span span) { this.span = span; } - + @Override public String toString() { return new StringBuilder() @@ -74,7 +74,7 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation .append("}") .toString(); } - + @Override public boolean equals(Object other) { if (this == other) { @@ -94,6 +94,6 @@ public abstract class AbstractSpanBatchInformation extends BasicBatchInformation public int hashCode() { return Objects.hash(super.getBatchSize(), span, super.getColumn(), super.getTask()); } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java index a266341..a883be1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java @@ -37,9 +37,8 @@ import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; @@ -53,7 +52,6 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { private static final Logger log = Logger.getLogger(JoinBatchBindingSetUpdater.class); private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); - private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); /** * Processes {@link JoinBatchInformation}. Updates the BindingSets @@ -65,7 +63,7 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { * entries that need to be updated exceeds the batch size, the row of the * first unprocessed BindingSets is used to create a new JoinBatch job to * process the remaining BindingSets. - * @throws Exception + * @throws Exception */ @Override public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { @@ -100,15 +98,15 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { newJoinResults = joinAlgorithm.newRightResult(bsSet.iterator(), bs); } - // Insert the new join binding sets to the Fluo table. - final JoinMetadata joinMetadata = dao.readJoinMetadata(tx, nodeId); + // Read join metadata, create new join BindingSets and insert them into the Fluo table. + final JoinMetadata joinMetadata = CACHE.readJoinMetadata(tx, nodeId); final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); while (newJoinResults.hasNext()) { final VisibilityBindingSet newJoinResult = newJoinResults.next(); //create BindingSet value Bytes bsBytes = BS_SERDE.serialize(newJoinResult); //make rowId - Bytes rowKey = RowKeyUtil.makeRowKey(nodeId, joinVarOrder, newJoinResult); + Bytes rowKey = BindingHashShardingFunction.addShard(nodeId, joinVarOrder, newJoinResult); final Column col = FluoQueryColumns.JOIN_BINDING_SET; processTask(tx, task, rowKey, col, bsBytes); } @@ -144,12 +142,12 @@ public class JoinBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { * Fetches batch to be processed by scanning over the Span specified by the * {@link JoinBatchInformation}. The number of results is less than or equal * to the batch size specified by the JoinBatchInformation. - * + * * @param tx - Fluo transaction in which batch operation is performed * @param batch - batch order to be processed * @param bsSet- set that batch results are added to * @return Set - containing results of sibling scan. - * @throws Exception + * @throws Exception */ private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
