http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index 20d3005..dc4b3b4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -20,25 +20,25 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static com.google.common.base.Preconditions.checkNotNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.observer.AbstractObserver; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Notified when the results of a node have been updated to include a new Binding @@ -47,6 +47,7 @@ import org.apache.fluo.api.observer.AbstractObserver; */ @DefaultAnnotation(NonNull.class) public abstract class BindingSetUpdater extends AbstractObserver { + private static final Logger log = Logger.getLogger(BindingSetUpdater.class); // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -55,6 +56,7 @@ public abstract class BindingSetUpdater extends AbstractObserver { private final JoinResultUpdater joinUpdater = new JoinResultUpdater(); private final FilterResultUpdater filterUpdater = new FilterResultUpdater(); private final QueryResultUpdater queryUpdater = new QueryResultUpdater(); + private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater(); @Override public abstract ObservedColumn getObservedColumn(); @@ -63,10 +65,11 @@ public abstract class BindingSetUpdater extends AbstractObserver { * Create an {@link Observation} that defines the work that needs to be done. * * @param tx - The Fluo transaction being used for the observer notification. (not null) - * @param parsedRow - The RowID parsed into a Binding Set and Node ID. (not null) + * @param row - The row that triggered the notification. (not null) * @return An {@link Observation} that defines the work that needs to be done. + * @throws Exception A problem caused this method to fail. */ - public abstract Observation parseObservation(TransactionBase tx, final BindingSetRow parsedRow); + public abstract Observation parseObservation(TransactionBase tx, Bytes row) throws Exception; @Override public final void process(final TransactionBase tx, final Bytes row, final Column col) { @@ -74,8 +77,15 @@ public abstract class BindingSetUpdater extends AbstractObserver { checkNotNull(row); checkNotNull(col); - final String bindingSetString = tx.get(row, col).toString(); - final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) ); + final Observation observation; + try { + observation = parseObservation(tx, row); + } catch (final Exception e) { + log.error("Unable to parse an Observation from a Row and Column pair, so this notification will be skipped. " + + "Row: " + row + " Column: " + col, e); + return; + } + final String observedNodeId = observation.getObservedNodeId(); final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet(); final String parentNodeId = observation.getParentId(); @@ -85,7 +95,11 @@ public abstract class BindingSetUpdater extends AbstractObserver { switch(parentNodeType) { case QUERY: final QueryMetadata parentQuery = queryDao.readQueryMetadata(tx, parentNodeId); - queryUpdater.updateQueryResults(tx, observedBindingSet, parentQuery); + try { + queryUpdater.updateQueryResults(tx, observedBindingSet, parentQuery); + } catch (final Exception e) { + throw new RuntimeException("Could not process a Query node.", e); + } break; case FILTER: @@ -101,11 +115,20 @@ public abstract class BindingSetUpdater extends AbstractObserver { final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId); try { joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin); - } catch (final BindingSetConversionException e) { + } catch (final Exception e) { throw new RuntimeException("Could not process a Join node.", e); } break; + case AGGREGATION: + final AggregationMetadata parentAggregation = queryDao.readAggregationMetadata(tx, parentNodeId); + try { + aggregationUpdater.updateAggregateResults(tx, observedBindingSet, parentAggregation); + } catch (final Exception e) { + throw new RuntimeException("Could not process an Aggregation node.", e); + } + break; + default: throw new IllegalArgumentException("The parent node's NodeType must be of type Filter, Join, or Query, but was " + parentNodeType); }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java index 36af898..f5c7177 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java @@ -18,19 +18,18 @@ */ package org.apache.rya.indexing.pcj.fluo.app.observers; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.query.BindingSet; -import org.apache.fluo.api.client.TransactionBase; - /** * Notified when the results of a Filter have been updated to include a new * {@link BindingSet}. This observer updates its parent if the new Binding Set @@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase; */ public class FilterObserver extends BindingSetUpdater { - private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -48,17 +47,17 @@ public class FilterObserver extends BindingSetUpdater { } @Override - public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { - checkNotNull(tx); - checkNotNull(parsedRow); + public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception { + requireNonNull(tx); + requireNonNull(row); // Read the Filter metadata. - final String filterNodeId = parsedRow.getNodeId(); + final String filterNodeId = BindingSetRow.make(row).getNodeId(); final FilterMetadata filterMetadata = queryDao.readFilterMetadata(tx, filterNodeId); - // Read the Binding Set that was just emmitted by the Filter. - final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); - final VisibilityBindingSet filterBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), filterVarOrder); + // Read the Visibility Binding Set from the value. + final Bytes valueBytes = tx.get(row, FluoQueryColumns.FILTER_BINDING_SET); + final VisibilityBindingSet filterBindingSet = BS_SERDE.deserialize(valueBytes); // Figure out which node needs to handle the new metadata. final String parentNodeId = filterMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java index 6933096..141ccc7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java @@ -18,19 +18,18 @@ */ package org.apache.rya.indexing.pcj.fluo.app.observers; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.query.BindingSet; -import org.apache.fluo.api.client.TransactionBase; - /** * Notified when the results of a Join have been updated to include a new * {@link BindingSet}. This observer updates its parent if the new Binding Set @@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase; */ public class JoinObserver extends BindingSetUpdater { - private final VisibilityBindingSetStringConverter converter = new VisibilityBindingSetStringConverter(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -48,16 +47,17 @@ public class JoinObserver extends BindingSetUpdater { } @Override - public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { - checkNotNull(parsedRow); + public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception { + requireNonNull(tx); + requireNonNull(row); // Read the Join metadata. - final String joinNodeId = parsedRow.getNodeId(); + final String joinNodeId = BindingSetRow.make(row).getNodeId(); final JoinMetadata joinMetadata = queryDao.readJoinMetadata(tx, joinNodeId); - // Read the Binding Set that was just emmitted by the Join. - final VariableOrder joinVarOrder = joinMetadata.getVariableOrder(); - final VisibilityBindingSet joinBindingSet = (VisibilityBindingSet) converter.convert(parsedRow.getBindingSetString(), joinVarOrder); + // Read the Visibility Binding Set from the value. + final Bytes valueBytes = tx.get(row, FluoQueryColumns.JOIN_BINDING_SET); + final VisibilityBindingSet joinBindingSet = BS_SERDE.deserialize(valueBytes); // Figure out which node needs to handle the new metadata. final String parentNodeId = joinMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 1238c18..28c92af 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -29,6 +29,7 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.log4j.Logger; import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; @@ -36,11 +37,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFact import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; @@ -51,13 +48,7 @@ import com.google.common.collect.ImmutableSet; public class QueryResultObserver extends AbstractObserver { private static final Logger log = Logger.getLogger(QueryResultObserver.class); - private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); - private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); - - /** - * Simplifies Visibility expressions prior to exporting PCJ results. - */ - private static final VisibilitySimplifier SIMPLIFIER = new VisibilitySimplifier(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); /** * We expect to see the same expressions a lot, so we cache the simplified forms. @@ -91,9 +82,9 @@ public class QueryResultObserver extends AbstractObserver { final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder(); for(final IncrementalResultExporterFactory builder : factories) { - try { - log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder); + try { final Optional<IncrementalResultExporter> exporter = builder.build(context); if(exporter.isPresent()) { exportersBuilder.add(exporter.get()); @@ -107,28 +98,22 @@ public class QueryResultObserver extends AbstractObserver { } @Override - public void process(final TransactionBase tx, final Bytes brow, final Column col) { + public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception { final String row = brow.toString(); - - // Read the SPARQL query and it Binding Set from the row id. - final String[] queryAndBindingSet = row.split(NODEID_BS_DELIM); - final String queryId = queryAndBindingSet[0]; - final String bindingSetString = tx.gets(row, col); - // Fetch the query's Variable Order from the Fluo table. - final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId); - final VariableOrder varOrder = queryMetadata.getVariableOrder(); + // Read the SPARQL query and it Binding Set from the row id. + final String queryId = row.split(NODEID_BS_DELIM)[0]; - // Create the result that will be exported. - final VisibilityBindingSet result = CONVERTER.convert(bindingSetString, varOrder); + // Read the Child Binding Set that will be exported. + final Bytes valueBytes = tx.get(brow, col); + final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes); // Simplify the result's visibilities. final String visibility = result.getVisibility(); if(!simplifiedVisibilities.containsKey(visibility)) { - final String simplified = SIMPLIFIER.simplify( visibility ); + final String simplified = VisibilitySimplifier.simplify( visibility ); simplifiedVisibilities.put(visibility, simplified); } - result.setVisibility( simplifiedVisibilities.get(visibility) ); // Export the result using each of the provided exporters. @@ -136,8 +121,21 @@ public class QueryResultObserver extends AbstractObserver { try { exporter.export(tx, queryId, result); } catch (final ResultExportException e) { - log.error("Could not export a binding set for query '" + queryId + "'. Binding Set: " + bindingSetString); + log.error("Could not export a binding set for query '" + queryId + "'. Binding Set: " + result, e); + } + } + } + + @Override + public void close() { + if(exporters != null) { + for(final IncrementalResultExporter exporter : exporters) { + try { + exporter.close(); + } catch(final Exception e) { + log.warn("Problem encountered while closing one of the exporters.", e); + } } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java index 5956634..b0548b4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java @@ -18,19 +18,18 @@ */ package org.apache.rya.indexing.pcj.fluo.app.observers; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.openrdf.query.BindingSet; -import org.apache.fluo.api.client.TransactionBase; - /** * Notified when the results of a Statement Pattern have been updated to include * a new {@link BindingSet}. This observer updates its parent if the new @@ -38,7 +37,7 @@ import org.apache.fluo.api.client.TransactionBase; */ public class StatementPatternObserver extends BindingSetUpdater { - private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -49,17 +48,17 @@ public class StatementPatternObserver extends BindingSetUpdater { } @Override - public Observation parseObservation(final TransactionBase tx, final BindingSetRow parsedRow) { - checkNotNull(tx); + public Observation parseObservation(final TransactionBase tx, final Bytes row) throws Exception { + requireNonNull(tx); + requireNonNull(row); // Read the Statement Pattern metadata. - final String spNodeId = parsedRow.getNodeId(); + final String spNodeId = BindingSetRow.make(row).getNodeId(); final StatementPatternMetadata spMetadata = queryDao.readStatementPatternMetadata(tx, spNodeId); - final String bindingSetValue = parsedRow.getBindingSetString(); - // Read the Binding Set that was just emmitted by the Statement Pattern. - final VariableOrder spVarOrder = spMetadata.getVariableOrder(); - final VisibilityBindingSet spBindingSet = (VisibilityBindingSet) CONVERTER.convert(bindingSetValue, spVarOrder); + // Read the Visibility Binding Set from the value. + final Bytes valueBytes = tx.get(row, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET); + final VisibilityBindingSet spBindingSet = BS_SERDE.deserialize(valueBytes); // Figure out which node needs to handle the new metadata. final String parentNodeId = spMetadata.getParentNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index 70f1cbd..3c43885 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -21,11 +21,20 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; import java.util.Map; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.api.observer.AbstractObserver; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; +import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; @@ -33,15 +42,8 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; +import com.google.common.base.Charsets; import com.google.common.collect.Maps; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.observer.AbstractObserver; /** * An observer that matches new Triples to the Statement Patterns that are part @@ -49,9 +51,11 @@ import org.apache.fluo.api.observer.AbstractObserver; * the new result is stored as a binding set for the pattern. */ public class TripleObserver extends AbstractObserver { + private static final Logger log = Logger.getLogger(TripleObserver.class); - private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); - private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new FluoQueryMetadataDAO(); + private static final VisibilityBindingSetStringConverter VIS_BS_CONVERTER = new VisibilityBindingSetStringConverter(); public TripleObserver() {} @@ -62,85 +66,113 @@ public class TripleObserver extends AbstractObserver { @Override public void process(final TransactionBase tx, final Bytes brow, final Column column) { - //get string representation of triple - String row = brow.toString(); - final String triple = IncUpdateDAO.getTripleString(brow); - String visibility = tx.gets(row, FluoQueryColumns.TRIPLES, ""); - - //get variable metadata for all SP in table - RowScanner rscanner = tx.scanner().over(Span.prefix(SP_PREFIX)).fetch(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER).byRow().build(); - + // Get string representation of triple. + final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow); + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Rya Statement: " + ryaStatement + "\n"); - //see if triple matches conditions of any of the SP + final String triple = IncUpdateDAO.getTripleString(ryaStatement); - for (ColumnScanner colScanner : rscanner) { + // Iterate over each of the Statement Patterns that are being matched against. + final RowScanner spScanner = tx.scanner() + .over(Span.prefix(SP_PREFIX)) + + // Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP. + .fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) + .byRow() + .build(); + + //see if triple matches conditions of any of the SP + for (final ColumnScanner colScanner : spScanner) { + // Get the Statement Pattern's node id. final String spID = colScanner.getsRow(); - final StatementPatternMetadata spMetadata = QUERY_DAO.readStatementPatternMetadata(tx, spID); + // Fetch its metadata. + final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID); + + // Attempt to match the triple against the pattern. final String pattern = spMetadata.getStatementPattern(); - - for (ColumnValue cv : colScanner) { - final String varOrders = cv.getsValue(); - final VariableOrder varOrder = new VariableOrder(varOrders); - final String bindingSetString = getBindingSet(triple, pattern, varOrders); - - //Statement matches to a binding set - if(bindingSetString.length() != 0) { - final VisibilityBindingSet bindingSet = new VisibilityBindingSet( - CONVERTER.convert(bindingSetString, varOrder), - visibility); - final String valueString = CONVERTER.convert(bindingSet, varOrder); - tx.set(spID + NODEID_BS_DELIM + bindingSetString, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueString); + final VariableOrder varOrder = spMetadata.getVariableOrder(); + final String bindingSetString = getBindingSet(triple, pattern, varOrder); + + // Statement matches to a binding set. + if(bindingSetString.length() != 0) { + // Fetch the triple's visibility label. + final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, ""); + + // Create the Row ID for the emitted binding set. It does not contain visibilities. + final String row = spID + NODEID_BS_DELIM + bindingSetString; + final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) ); + + // If this is a new Binding Set, then emit it. + if(tx.get(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) == null) { + // Create the Binding Set that goes in the Node Value. It does contain visibilities. + final VisibilityBindingSet visBindingSet = VIS_BS_CONVERTER.convert(bindingSetString, varOrder); + visBindingSet.setVisibility(visibility); + + try { + final Bytes valueBytes = BS_SERDE.serialize(visBindingSet); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Matched Statement Pattern: " + spID + "\n" + + "Binding Set: " + visBindingSet + "\n"); + + tx.set(rowBytes, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes); + } catch(final Exception e) { + log.error("Couldn't serialize a Binding Set. This value will be skipped.", e); + } } - } - } + } + } // Once the triple has been handled, it may be deleted. - tx.delete(row, column); + tx.delete(brow, column); } /** - * Determines whether triple matches Statement Pattern ID conditions if - * so, generates a string representation of a BindingSet whose order - * is determined by varOrder. + * Determines whether a triple matches a Statement Pattern. If so, it generates a string representation of a + * BindingSet whose order is determined by varOrder. + * * @param triple - The triple to consider. - * @param spID - The statement pattern ID - * @param varOrder - The variable order - * @return The string representation of the BindingSet or an empty string, - * signifying the triple did not match the statement pattern ID. + * @param pattern - The pattern the triple must match. + * @param varOrder - The variable order of the Binding Set String that is produced by this method. + * @return The string representation of a Binding Set that is generated by matching the triple to the pattern; + * otherwise an empty string if the pattern couldn't be matched. */ - private static String getBindingSet(final String triple, final String spID, final String varOrder) { - final String[] spIdArray = spID.split(DELIM); + private static String getBindingSet(final String triple, final String pattern, final VariableOrder varOrder) { + final String[] patternArray = pattern.split(DELIM); final String[] tripleArray = triple.split(DELIM); - final String[] varOrderArray = varOrder.split(VAR_DELIM); - final Map<String,String> varMap = Maps.newHashMap(); + final String[] varOrderArray = varOrder.toArray(); + final Map<String,String> bindingValues = Maps.newHashMap(); - if(spIdArray.length != 3 || tripleArray.length != 3) { + if(patternArray.length != 3 || tripleArray.length != 3) { throw new IllegalArgumentException("Invald number of components"); } + // Extract the binding names and values. for(int i = 0; i < 3; i ++) { - - if(spIdArray[i].startsWith("-const-")) { - if(!spIdArray[i].substring(7).equals(tripleArray[i])) { + if(patternArray[i].startsWith("-const-")) { + // If a constant value does not match, then the triple does not match the pattern. + if(!patternArray[i].substring(7).equals(tripleArray[i])) { return ""; } } else{ - varMap.put(spIdArray[i], tripleArray[i]); + bindingValues.put(patternArray[i], tripleArray[i]); } - } - String bindingSet = ""; - - for (final String element : varOrderArray) { - if(bindingSet.length() == 0) { - bindingSet = varMap.get(element); + // Create the returned binding set string from the extracted values. + String bindingSetString = ""; + for (final String bindingName : varOrderArray) { + if(bindingSetString.length() == 0) { + bindingSetString = bindingValues.get(bindingName); } else { - bindingSet = bindingSet + DELIM + varMap.get(element); + bindingSetString = bindingSetString + DELIM + bindingValues.get(bindingName); } } - return bindingSet; + return bindingSetString; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java new file mode 100644 index 0000000..3bc8da6 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static java.util.Objects.requireNonNull; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.algebra.AggregateOperator; +import org.openrdf.query.algebra.Avg; +import org.openrdf.query.algebra.Count; +import org.openrdf.query.algebra.Max; +import org.openrdf.query.algebra.Min; +import org.openrdf.query.algebra.Sum; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + +/** + * Metadata that is relevant to Aggregate nodes. + */ +@Immutable +@DefaultAnnotation(NonNull.class) +public class AggregationMetadata extends CommonNodeMetadata { + + /** + * The different types of Aggregation functions that an aggregate node may perform. + */ + public static enum AggregationType { + MIN(Min.class), + MAX(Max.class), + COUNT(Count.class), + SUM(Sum.class), + AVERAGE(Avg.class); + + private final Class<? extends AggregateOperator> operatorClass; + + private AggregationType(final Class<? extends AggregateOperator> operatorClass) { + this.operatorClass = requireNonNull(operatorClass); + } + + private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass; + static { + final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder(); + for(final AggregationType type : AggregationType.values()) { + builder.put(type.operatorClass, type); + } + byOperatorClass = builder.build(); + } + + public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) { + return Optional.ofNullable( byOperatorClass.get(operatorClass) ); + } + } + + /** + * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query. + * </p> + * For example, if you have the following in SPARQL: + * <pre> + * SELECT (avg(?price) as ?avgPrice) { + * ... + * } + * </pre> + * You would construct an instance of this object like so: + * <pre> + * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice"); + * </pre> + */ + @Immutable + @DefaultAnnotation(NonNull.class) + public static final class AggregationElement implements Serializable { + private static final long serialVersionUID = 1L; + + private final AggregationType aggregationType; + private final String aggregatedBindingName; + private final String resultBindingName; + + /** + * Constructs an instance of {@link AggregationElement}. + * + * @param aggregationType - Defines how the binding values will be aggregated. (not null) + * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must + * appear within the child node's emitted binding sets. (not null) + * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding + * must appeared within the AggregationMetadata's variable order. (not null) + */ + public AggregationElement( + final AggregationType aggregationType, + final String aggregatedBindingName, + final String resultBindingName) { + this.aggregationType = requireNonNull(aggregationType); + this.aggregatedBindingName = requireNonNull(aggregatedBindingName); + this.resultBindingName = requireNonNull(resultBindingName); + } + + /** + * @return Defines how the binding values will be aggregated. + */ + public AggregationType getAggregationType() { + return aggregationType; + } + + /** + * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets. + */ + public String getAggregatedBindingName() { + return aggregatedBindingName; + } + + /** + * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order. + */ + public String getResultBindingName() { + return resultBindingName; + } + + @Override + public int hashCode() { + return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName); + } + + @Override + public boolean equals(final Object o ) { + if(o instanceof AggregationElement) { + final AggregationElement agg = (AggregationElement) o; + return Objects.equals(aggregationType, agg.aggregationType) && + Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) && + Objects.equals(resultBindingName, agg.resultBindingName); + } + return false; + } + } + + private final String parentNodeId; + private final String childNodeId; + private final Collection<AggregationElement> aggregations; + private final VariableOrder groupByVariables; + + /** + * Constructs an instance of {@link AggregationMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The variable order of binding sets that are emitted by this node. This may only contain a + * single variable because aggregations are only able to emit the aggregated value. (not null) + * @param parentNodeId - The Node ID of this node's parent. This is the node that will consume the results of the aggregations. (not null) + * @param childNodeId - The Node ID of this node's child. This is the node that will feed binding sets into the aggregations. (not null) + * @param aggregations - The aggregations that will be performed over the BindingSets that are emitted from the child node. (not null) + * @param groupByVariables - Defines how the data is grouped for the aggregation function. (not null, may be empty if no grouping is required) + */ + public AggregationMetadata( + final String nodeId, + final VariableOrder varOrder, + final String parentNodeId, + final String childNodeId, + final Collection<AggregationElement> aggregations, + final VariableOrder groupByVariables) { + super(nodeId, varOrder); + this.parentNodeId = requireNonNull(parentNodeId); + this.childNodeId = requireNonNull(childNodeId); + this.aggregations = requireNonNull(aggregations); + this.groupByVariables = requireNonNull(groupByVariables); + } + + /** + * @return The Node ID of this node's parent. This is the node that will consume the results of the aggregations. + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * @return The Node ID of this node's child. This is the node that will feed binding sets into the aggregations. + */ + public String getChildNodeId() { + return childNodeId; + } + + /** + * @return The aggregations that will be performed over the BindingSets that are emitted from the child node. + */ + public Collection<AggregationElement> getAggregations() { + return aggregations; + } + + /** + * @return Defines how the data is grouped for the aggregation function. + */ + public VariableOrder getGroupByVariableOrder() { + return groupByVariables; + } + + @Override + public int hashCode() { + return Objects.hash( + super.getNodeId(), + super.getVariableOrder(), + parentNodeId, + childNodeId, + aggregations, + groupByVariables); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof AggregationMetadata) { + final AggregationMetadata metadata = (AggregationMetadata) o; + return Objects.equals(getNodeId(), metadata.getNodeId()) && + Objects.equals(super.getVariableOrder(), metadata.getVariableOrder()) && + Objects.equals(parentNodeId, metadata.parentNodeId) && + Objects.equals(childNodeId, metadata.childNodeId) && + Objects.equals(aggregations, metadata.aggregations) && + Objects.equals(groupByVariables, metadata.groupByVariables); + } + return false; + } + + @Override + public String toString() { + final StringBuilder string = new StringBuilder() + .append("AggregationMetadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Parent Node ID: " + parentNodeId + "\n") + .append(" Child Node ID: " + childNodeId + "\n"); + + // Only print the group by names if they're preesnt. + if(!groupByVariables.getVariableOrders().isEmpty()) { + string.append(" GroupBy Variable Order: " + groupByVariables + "\n"); + } + + // Print each of the AggregationElements. + string.append(" Aggregations: {\n"); + final Iterator<AggregationElement> it = aggregations.iterator(); + while(it.hasNext()) { + final AggregationElement agg = it.next(); + string.append(" Type: " + agg.getAggregationType() + "\n"); + string.append(" Aggregated Binding Name: " + agg.getAggregatedBindingName() + "\n"); + string.append(" Result Binding Name: " + agg.getResultBindingName() + "\n"); + + if(it.hasNext()) { + string.append("\n"); + } + } + string.append(" }\n"); + string.append("}"); + + return string.toString(); + } + + /** + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @return A new {@link Builder} initialized with the provided nodeId. + */ + public static Builder builder(final String nodeId) { + return new Builder(nodeId); + } + + /** + * Builds instances of {@link AggregationMetadata}. + */ + @DefaultAnnotation(NonNull.class) + public static final class Builder { + + private final String nodeId; + private VariableOrder varOrder; + private String parentNodeId; + private String childNodeId; + private final List<AggregationElement> aggregations = new ArrayList<>(); + private VariableOrder groupByVariables = new VariableOrder(); + + /** + * Constructs an instance of {@link Builder}. + * + * @param nodeId - This node's Node ID. (not null) + */ + public Builder(final String nodeId) { + this.nodeId = requireNonNull(nodeId); + } + + /** + * @return This node's Node ID. + */ + public String getNodeId() { + return nodeId; + } + + /** + * @param varOrder - The variable order of binding sets that are emitted by this node. This may only contain a + * single variable because aggregations are only able to emit the aggregated value. + * @return This builder so that method invocations may be chained. + */ + public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * @param parentNodeId - The Node ID of this node's parent. + * @return This builder so that method invocations may be chained. + */ + public Builder setParentNodeId(@Nullable final String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } + + /** + * @param childNodeId - The Node ID of this node's child. + * @return This builder so that method invocations may be chained. + */ + public Builder setChildNodeId(@Nullable final String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + /** + * @param aggregation - An aggregation that will be performed over the BindingSets that are emitted from the child node. + * @return This builder so that method invocations may be chained. + */ + public Builder addAggregation(@Nullable final AggregationElement aggregation) { + if(aggregation != null) { + this.aggregations.add(aggregation); + } + return this; + } + + /** + * @param groupByBindingNames - Defines how the data is grouped for the aggregation function. (not null, may be + * empty if no grouping is required) + * @return This builder so that method invocations may be chained. + */ + public Builder setGroupByVariableOrder(@Nullable final VariableOrder groupByVariables) { + this.groupByVariables = groupByVariables; + return this; + } + + /** + * @return An instance of {@link AggregationMetadata} build using this builder's values. + */ + public AggregationMetadata build() { + return new AggregationMetadata(nodeId, varOrder, parentNodeId, childNodeId, aggregations, groupByVariables); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index 263db7e..3230a5d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -18,24 +18,24 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import edu.umd.cs.findbugs.annotations.Nullable; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; - import org.apache.commons.lang3.builder.EqualsBuilder; import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + /** * Metadata for every node of a query that is being updated by the Fluo application. */ @@ -47,6 +47,7 @@ public class FluoQuery { private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata; private final ImmutableMap<String, FilterMetadata> filterMetadata; private final ImmutableMap<String, JoinMetadata> joinMetadata; + private final ImmutableMap<String, AggregationMetadata> aggregationMetadata; /** * Constructs an instance of {@link FluoQuery}. Private because applications @@ -55,20 +56,24 @@ public class FluoQuery { * @param queryMetadata - The root node of a query that is updated in Fluo. (not null) * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as * it is represented within the Fluo app. (not null) - * @param filterMetadata A map from Node ID to Filter metadata as it is represented + * @param filterMetadata - A map from Node ID to Filter metadata as it is represented * within the Fluo app. (not null) - * @param joinMetadata A map from Node ID to Join metadata as it is represented + * @param joinMetadata - A map from Node ID to Join metadata as it is represented * within the Fluo app. (not null) + * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is + * represented within the Fluo app. (not null) */ private FluoQuery( final QueryMetadata queryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, - final ImmutableMap<String, JoinMetadata> joinMetadata) { - this.queryMetadata = checkNotNull(queryMetadata); - this.statementPatternMetadata = checkNotNull(statementPatternMetadata); - this.filterMetadata = checkNotNull(filterMetadata); - this.joinMetadata = checkNotNull(joinMetadata); + final ImmutableMap<String, JoinMetadata> joinMetadata, + final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { + this.queryMetadata = requireNonNull(queryMetadata); + this.statementPatternMetadata = requireNonNull(statementPatternMetadata); + this.filterMetadata = requireNonNull(filterMetadata); + this.joinMetadata = requireNonNull(joinMetadata); + this.aggregationMetadata = requireNonNull(aggregationMetadata); } /** @@ -85,7 +90,7 @@ public class FluoQuery { * @return The StatementPattern metadata if it could be found; otherwise absent. */ public Optional<StatementPatternMetadata> getStatementPatternMetadata(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( statementPatternMetadata.get(nodeId) ); } @@ -103,7 +108,7 @@ public class FluoQuery { * @return The Filter metadata if it could be found; otherwise absent. */ public Optional<FilterMetadata> getFilterMetadata(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( filterMetadata.get(nodeId) ); } @@ -121,7 +126,7 @@ public class FluoQuery { * @return The Join metadata if it could be found; otherwise absent. */ public Optional<JoinMetadata> getJoinMetadata(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( joinMetadata.get(nodeId) ); } @@ -132,13 +137,32 @@ public class FluoQuery { return joinMetadata.values(); } + /** + * Get an Aggregation node's metadata. + * + * @param nodeId - The node ID of the Aggregation metadata you want. (not null) + * @return The Aggregation metadata if it could be found; otherwise absent. + */ + public Optional<AggregationMetadata> getAggregationMetadata(final String nodeId) { + requireNonNull(nodeId); + return Optional.fromNullable( aggregationMetadata.get(nodeId) ); + } + + /** + * @return All of the Aggregation metadata that is stored for the query. + */ + public Collection<AggregationMetadata> getAggregationMetadata() { + return aggregationMetadata.values(); + } + @Override public int hashCode() { return Objects.hashCode( queryMetadata, statementPatternMetadata, filterMetadata, - joinMetadata); + joinMetadata, + aggregationMetadata); } @Override @@ -154,6 +178,7 @@ public class FluoQuery { .append(statementPatternMetadata, fluoQuery.statementPatternMetadata) .append(filterMetadata, fluoQuery.filterMetadata) .append(joinMetadata, fluoQuery.joinMetadata) + .append(aggregationMetadata, fluoQuery.aggregationMetadata) .isEquals(); } @@ -184,6 +209,11 @@ public class FluoQuery { builder.append("\n"); } + for(final AggregationMetadata metadata : aggregationMetadata.values()) { + builder.append(metadata.toString()); + builder.append("\n"); + } + return builder.toString(); } @@ -204,6 +234,7 @@ public class FluoQuery { private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>(); private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>(); private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>(); + private final Map<String, AggregationMetadata.Builder> aggregationBuilders = new HashMap<>(); /** * Sets the {@link QueryMetadata.Builder} that is used by this builder. @@ -230,7 +261,7 @@ public class FluoQuery { * @return This builder so that method invocation may be chained. */ public Builder addStatementPatternBuilder(final StatementPatternMetadata.Builder spBuilder) { - checkNotNull(spBuilder); + requireNonNull(spBuilder); spBuilders.put(spBuilder.getNodeId(), spBuilder); return this; } @@ -242,7 +273,7 @@ public class FluoQuery { * @return The builder that was stored at the node id if one was found. */ public Optional<StatementPatternMetadata.Builder> getStatementPatternBuilder(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( spBuilders.get(nodeId) ); } @@ -252,8 +283,8 @@ public class FluoQuery { * @param filterBuilder - A builder representing a specific Filter within the query. (not null) * @return This builder so that method invocation may be chained. */ - public Builder addFilterMetadata(@Nullable final FilterMetadata.Builder filterBuilder) { - checkNotNull(filterBuilder); + public Builder addFilterMetadata(final FilterMetadata.Builder filterBuilder) { + requireNonNull(filterBuilder); this.filterBuilders.put(filterBuilder.getNodeId(), filterBuilder); return this; } @@ -265,7 +296,7 @@ public class FluoQuery { * @return The builder that was stored at the node id if one was found. */ public Optional<FilterMetadata.Builder> getFilterBuilder(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( filterBuilders.get(nodeId) ); } @@ -275,20 +306,43 @@ public class FluoQuery { * @param joinBuilder - A builder representing a specific Join within the query. (not null) * @return This builder so that method invocation may be chained. */ - public Builder addJoinMetadata(@Nullable final JoinMetadata.Builder joinBuilder) { - checkNotNull(joinBuilder); + public Builder addJoinMetadata(final JoinMetadata.Builder joinBuilder) { + requireNonNull(joinBuilder); this.joinBuilders.put(joinBuilder.getNodeId(), joinBuilder); return this; } /** + * Get an Aggregate builder from this builder. + * + * @param nodeId - The Node ID the Aggregate builder was stored at. (not null) + * @return The builder that was stored at the node id if one was found. + */ + public Optional<AggregationMetadata.Builder> getAggregateBuilder(final String nodeId) { + requireNonNull(nodeId); + return Optional.fromNullable( aggregationBuilders.get(nodeId) ); + } + + /** + * Adds a new {@link AggregationMetadata.Builder} to this builder. + * + * @param aggregationBuilder - A builder representing a specific Aggregation within the Query. (not null) + * @return This builder so that method invocation may be chained. + */ + public Builder addAggregateMetadata(@Nullable final AggregationMetadata.Builder aggregationBuilder) { + requireNonNull(aggregationBuilder); + this.aggregationBuilders.put(aggregationBuilder.getNodeId(), aggregationBuilder); + return this; + } + + /** * Get a Join builder from this builder. * * @param nodeId - The Node ID the Join builder was stored at. (not null) * @return The builder that was stored at the node id if one was found. */ public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); return Optional.fromNullable( joinBuilders.get(nodeId) ); } @@ -313,7 +367,12 @@ public class FluoQuery { joinMetadata.put(entry.getKey(), entry.getValue().build()); } - return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build()); + final ImmutableMap.Builder<String, AggregationMetadata> aggregateMetadata = ImmutableMap.builder(); + for(final Entry<String, AggregationMetadata.Builder> entry : aggregationBuilders.entrySet()) { + aggregateMetadata.put(entry.getKey(), entry.getValue().build()); + } + + return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index f12c6ab..77d6a49 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -23,11 +23,13 @@ import static java.util.Objects.requireNonNull; import java.util.Arrays; import java.util.List; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; -import org.apache.fluo.api.data.Column; - /** * Holds {@link Column} objects that represent where each piece of metadata is stored. * <p> @@ -40,7 +42,7 @@ import org.apache.fluo.api.data.Column; * <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> - * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A Binding Set that matches the query.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> * <p> @@ -53,7 +55,7 @@ import org.apache.fluo.api.data.Column; * <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> <td>Indicates which filter within the original SPARQL query this represents.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node ID this filter emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr> - * <tr> <td>Node ID + DELIM + Binding set String</td> <td>filterMetadata:bindingSet</td> <td>A Binding Set that matches the Filter.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>filterMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> * <p> @@ -66,7 +68,7 @@ import org.apache.fluo.api.data.Column; * <tr> <td>Node ID</td> <td>joinMetadata:parentNodeId</td> <td>The Node ID this join emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:leftChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> - * <tr> <td>Node ID + DELIM + Binding set String</td> <td>joinMetadata:bindingSet</td> <td>A Binding Set that matches the Join.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>joinMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> * <p> @@ -77,9 +79,22 @@ import org.apache.fluo.api.data.Column; * <tr> <td>Node ID</td> <td>statementPatternMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>statementPatternMetadata:pattern</td> <td>The pattern that defines which Statements will be matched.</td> </tr> * <tr> <td>Node ID</td> <td>statementPatternMetadata:parentNodeId</td> <td>The Node ID this statement pattern emits Binding Sets to.</td> </tr> - * <tr> <td>Node ID + DELIM + Binding set String</td> <td>statementPatternMetadata:bindingSet</td> <td>A Binding Set that matches the Statement Pattern.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>statementPatternMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> + * </p> * <p> + * <b>Aggregation Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:nodeId</td> <td>The Node ID of the Statement Pattern.</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:parentNodeId</td> <td>The Node ID this Aggregation emits its result Binding Set to.</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:groupByBindingNames</td> <td>An ordered list of the binding names the aggregation's results will be grouped by.</td> </tr> + * <tr> <td>Node ID</td> <td>aggregationMetadata:aggregations</td> <td>A serialized form of the aggregations that need to be performed by this aggregation node.</td> </tr> + * <tr> <td>Node ID + DELIM + Group By Values Binding Set String</td> <td>aggregationMetadata:bindingSet</td><td>An {@link AggregationState} object.</td> </tr> + * </table> + * </p> */ public class FluoQueryColumns { @@ -88,6 +103,7 @@ public class FluoQueryColumns { public static final String FILTER_METADATA_CF = "filterMetadata"; public static final String JOIN_METADATA_CF = "joinMetadata"; public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; + public static final String AGGREGATION_METADATA_CF = "aggregationMetadata"; /** * New triples that have been added to Rya are written as a row in this @@ -96,7 +112,7 @@ public class FluoQueryColumns { * <p> * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr> + * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>The visibility label for the triple.</td> </tr> * </table> * </p> */ @@ -108,7 +124,7 @@ public class FluoQueryColumns { * <p> * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the reuslts of this query will be exported to.</td> </tr> + * <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the results of this query will be exported to.</td> </tr> * </table> * </p> */ @@ -160,6 +176,15 @@ public class FluoQueryColumns { public static final Column STATEMENT_PATTERN_PARENT_NODE_ID = new Column(STATEMENT_PATTERN_METADATA_CF, "parentNodeId"); public static final Column STATEMENT_PATTERN_BINDING_SET = new Column(STATEMENT_PATTERN_METADATA_CF, "bindingSet"); + // Aggregation Metadata columns. + public static final Column AGGREGATION_NODE_ID = new Column(AGGREGATION_METADATA_CF, "nodeId"); + public static final Column AGGREGATION_VARIABLE_ORDER = new Column(AGGREGATION_METADATA_CF, "variableOrder"); + public static final Column AGGREGATION_PARENT_NODE_ID = new Column(AGGREGATION_METADATA_CF, "parentNodeId"); + public static final Column AGGREGATION_CHILD_NODE_ID = new Column(AGGREGATION_METADATA_CF, "childNodeId"); + public static final Column AGGREGATION_GROUP_BY_BINDING_NAMES = new Column(AGGREGATION_METADATA_CF, "groupByBindingNames"); + public static final Column AGGREGATION_AGGREGATIONS = new Column(AGGREGATION_METADATA_CF, "aggregations"); + public static final Column AGGREGATION_BINDING_SET = new Column(AGGREGATION_METADATA_CF, "bindingSet"); + /** * Enumerates the {@link Column}s that hold all of the fields for each type * of node that can compose a query. @@ -204,16 +229,27 @@ public class FluoQueryColumns { Arrays.asList(STATEMENT_PATTERN_NODE_ID, STATEMENT_PATTERN_VARIABLE_ORDER, STATEMENT_PATTERN_PATTERN, - STATEMENT_PATTERN_PARENT_NODE_ID)); + STATEMENT_PATTERN_PARENT_NODE_ID)), + + /** + * The columns an {@link AggregationMetadata} object's fields are stored within. + */ + AGGREGATION_COLUMNS( + Arrays.asList(AGGREGATION_NODE_ID, + AGGREGATION_VARIABLE_ORDER, + AGGREGATION_PARENT_NODE_ID, + AGGREGATION_CHILD_NODE_ID, + AGGREGATION_GROUP_BY_BINDING_NAMES, + AGGREGATION_AGGREGATIONS)); - private List<Column> columns; + private final List<Column> columns; /** * Constructs an instance of {@link QueryNodeMetadataColumns}. * * @param columns - The {@link Column}s associated with this node's metadata. (not null) */ - private QueryNodeMetadataColumns(List<Column> columns) { + private QueryNodeMetadataColumns(final List<Column> columns) { this.columns = requireNonNull(columns); } @@ -224,4 +260,4 @@ public class FluoQueryColumns { return columns; } } -} +} \ No newline at end of file
