http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index f9acb11..dfc3333 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -18,23 +18,30 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collection; import java.util.Map; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import com.google.common.collect.Sets; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; -import org.apache.fluo.api.client.SnapshotBase; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Reads and writes {@link FluoQuery} instances and their components to/from @@ -50,8 +57,8 @@ public class FluoQueryMetadataDAO { * @param metadata - The Query node metadata that will be written to the table. (not null) */ public void write(final TransactionBase tx, final QueryMetadata metadata) { - checkNotNull(tx); - checkNotNull(metadata); + requireNonNull(tx); + requireNonNull(metadata); final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId); @@ -65,19 +72,19 @@ public class FluoQueryMetadataDAO { * * @param sx - The snapshot that will be used to read the metadata . (not null) * @param nodeId - The nodeId of the Query node that will be read. (not nul) - * @return The {@link QueryMetadata} that was read from table. + * @return The {@link QueryMetadata} that was read from the table. */ public QueryMetadata readQueryMetadata(final SnapshotBase sx, final String nodeId) { return readQueryMetadataBuilder(sx, nodeId).build(); } private QueryMetadata.Builder readQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) { - checkNotNull(sx); - checkNotNull(nodeId); + requireNonNull(sx); + requireNonNull(nodeId); // Fetch the values from the Fluo table. final String rowId = nodeId; - final Map<Column, String> values = sx.gets(rowId, + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, FluoQueryColumns.QUERY_SPARQL, FluoQueryColumns.QUERY_CHILD_NODE_ID); @@ -102,8 +109,8 @@ public class FluoQueryMetadataDAO { * @param metadata - The Filter node metadata that will be written to the table. (not null) */ public void write(final TransactionBase tx, final FilterMetadata metadata) { - checkNotNull(tx); - checkNotNull(metadata); + requireNonNull(tx); + requireNonNull(metadata); final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId); @@ -119,19 +126,19 @@ public class FluoQueryMetadataDAO { * * @param sx - The snapshot that will be used to read the metadata. (not null) * @param nodeId - The nodeId of the Filter node that will be read. (not nul) - * @return The {@link FilterMetadata} that was read from table. + * @return The {@link FilterMetadata} that was read from the table. */ public FilterMetadata readFilterMetadata(final SnapshotBase sx, final String nodeId) { return readFilterMetadataBuilder(sx, nodeId).build(); } private FilterMetadata.Builder readFilterMetadataBuilder(final SnapshotBase sx, final String nodeId) { - checkNotNull(sx); - checkNotNull(nodeId); + requireNonNull(sx); + requireNonNull(nodeId); // Fetch the values from the Fluo table. final String rowId = nodeId; - final Map<Column, String> values = sx.gets(rowId, + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, @@ -162,8 +169,8 @@ public class FluoQueryMetadataDAO { * @param metadata - The Join node metadata that will be written to the table. (not null) */ public void write(final TransactionBase tx, final JoinMetadata metadata) { - checkNotNull(tx); - checkNotNull(metadata); + requireNonNull(tx); + requireNonNull(metadata); final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId); @@ -179,15 +186,15 @@ public class FluoQueryMetadataDAO { * * @param sx - The snapshot that will be used to read the metadata. (not null) * @param nodeId - The nodeId of the Join node that will be read. (not nul) - * @return The {@link JoinMetadata} that was read from table. + * @return The {@link JoinMetadata} that was read from the table. */ public JoinMetadata readJoinMetadata(final SnapshotBase sx, final String nodeId) { return readJoinMetadataBuilder(sx, nodeId).build(); } private JoinMetadata.Builder readJoinMetadataBuilder(final SnapshotBase sx, final String nodeId) { - checkNotNull(sx); - checkNotNull(nodeId); + requireNonNull(sx); + requireNonNull(nodeId); // Fetch the values from the Fluo table. final String rowId = nodeId; @@ -224,8 +231,8 @@ public class FluoQueryMetadataDAO { * @param metadata - The Statement Pattern node metadata that will be written to the table. (not null) */ public void write(final TransactionBase tx, final StatementPatternMetadata metadata) { - checkNotNull(tx); - checkNotNull(metadata); + requireNonNull(tx); + requireNonNull(metadata); final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId); @@ -239,15 +246,15 @@ public class FluoQueryMetadataDAO { * * @param sx - The snapshot that will be used to read the metadata. (not null) * @param nodeId - The nodeId of the Statement Pattern node that will be read. (not nul) - * @return The {@link StatementPatternMetadata} that was read from table. + * @return The {@link StatementPatternMetadata} that was read from the table. */ public StatementPatternMetadata readStatementPatternMetadata(final SnapshotBase sx, final String nodeId) { return readStatementPatternMetadataBuilder(sx, nodeId).build(); } private StatementPatternMetadata.Builder readStatementPatternMetadataBuilder(final SnapshotBase sx, final String nodeId) { - checkNotNull(sx); - checkNotNull(nodeId); + requireNonNull(sx); + requireNonNull(nodeId); // Fetch the values from the Fluo table. final String rowId = nodeId; @@ -270,14 +277,104 @@ public class FluoQueryMetadataDAO { } /** + * Write an instance of {@link AggregationMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Aggregation node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final AggregationMetadata metadata) { + requireNonNull(tx); + requireNonNull(metadata); + + final String rowId = metadata.getNodeId(); + tx.set(rowId, FluoQueryColumns.AGGREGATION_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.AGGREGATION_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.AGGREGATION_PARENT_NODE_ID, metadata.getParentNodeId()); + tx.set(rowId, FluoQueryColumns.AGGREGATION_CHILD_NODE_ID, metadata.getChildNodeId()); + + // Store the Group By variable order. + final VariableOrder groupByVars = metadata.getGroupByVariableOrder(); + final String groupByString = Joiner.on(";").join(groupByVars.getVariableOrders()); + tx.set(rowId, FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES, groupByString); + + // Serialize the collection of AggregationElements. + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try(final ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject( metadata.getAggregations() ); + } catch (final IOException e) { + throw new RuntimeException("Problem encountered while writing AggregationMetadata to the Fluo table. Unable " + + "to serialize the AggregationElements to a byte[].", e); + } + tx.set(Bytes.of(rowId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS, Bytes.of(baos.toByteArray())); + } + + /** + * Read an instance of {@link AggregationMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata. (not null) + * @param nodeId - The nodeId of the Aggregation node that will be read. (not null) + * @return The {@link AggregationMetadata} that was read from the table. + */ + public AggregationMetadata readAggregationMetadata(final SnapshotBase sx, final String nodeId) { + return readAggregationMetadataBuilder(sx, nodeId).build(); + } + + private AggregationMetadata.Builder readAggregationMetadataBuilder(final SnapshotBase sx, final String nodeId) { + requireNonNull(sx); + requireNonNull(nodeId); + + // Fetch the values from the Fluo table. + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, + FluoQueryColumns.AGGREGATION_VARIABLE_ORDER, + FluoQueryColumns.AGGREGATION_PARENT_NODE_ID, + FluoQueryColumns.AGGREGATION_CHILD_NODE_ID, + FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES); + + + // Return an object holding them. + final String varOrderString = values.get(FluoQueryColumns.AGGREGATION_VARIABLE_ORDER); + final VariableOrder varOrder = new VariableOrder(varOrderString); + + final String parentNodeId = values.get(FluoQueryColumns.AGGREGATION_PARENT_NODE_ID); + final String childNodeId = values.get(FluoQueryColumns.AGGREGATION_CHILD_NODE_ID); + + // Read the Group By variable order if one was present. + final String groupByString = values.get(FluoQueryColumns.AGGREGATION_GROUP_BY_BINDING_NAMES); + final VariableOrder groupByVars = groupByString.isEmpty() ? new VariableOrder() : new VariableOrder( groupByString.split(";") ); + + // Deserialize the collection of AggregationElements. + final Bytes aggBytes = sx.get(Bytes.of(nodeId.getBytes(Charsets.UTF_8)), FluoQueryColumns.AGGREGATION_AGGREGATIONS); + final Collection<AggregationElement> aggregations; + try(final ObjectInputStream ois = new ObjectInputStream(aggBytes.toInputStream())) { + aggregations = (Collection<AggregationElement>)ois.readObject(); + } catch (final IOException | ClassNotFoundException e) { + throw new RuntimeException("Problem encountered while reading AggregationMetadata from the Fluo table. Unable " + + "to deserialize the AggregationElements from a byte[].", e); + } + + final AggregationMetadata.Builder builder = AggregationMetadata.builder(nodeId) + .setVariableOrder(varOrder) + .setParentNodeId(parentNodeId) + .setChildNodeId(childNodeId) + .setGroupByVariableOrder(groupByVars); + + for(final AggregationElement aggregation : aggregations) { + builder.addAggregation(aggregation); + } + + return builder; + } + + /** * Write an instance of {@link FluoQuery} to the Fluo table. * * @param tx - The transaction that will be used to commit the metadata. (not null) * @param query - The query metadata that will be written to the table. (not null) */ public void write(final TransactionBase tx, final FluoQuery query) { - checkNotNull(tx); - checkNotNull(query); + requireNonNull(tx); + requireNonNull(query); // Store the Query ID so that it may be looked up from the original SPARQL string. final String sparql = query.getQueryMetadata().getSparql(); @@ -298,6 +395,10 @@ public class FluoQueryMetadataDAO { for(final StatementPatternMetadata statementPattern : query.getStatementPatternMetadata()) { write(tx, statementPattern); } + + for(final AggregationMetadata aggregation : query.getAggregationMetadata()) { + write(tx, aggregation); + } } /** @@ -308,8 +409,8 @@ public class FluoQueryMetadataDAO { * @return The {@link FluoQuery} that was read from table. */ public FluoQuery readFluoQuery(final SnapshotBase sx, final String queryId) { - checkNotNull(sx); - checkNotNull(queryId); + requireNonNull(sx); + requireNonNull(queryId); final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); addChildMetadata(sx, fluoQueryBuilder, queryId); @@ -317,9 +418,9 @@ public class FluoQueryMetadataDAO { } private void addChildMetadata(final SnapshotBase sx, final FluoQuery.Builder builder, final String childNodeId) { - checkNotNull(sx); - checkNotNull(builder); - checkNotNull(childNodeId); + requireNonNull(sx); + requireNonNull(builder); + requireNonNull(childNodeId); final NodeType childType = NodeType.fromNodeId(childNodeId).get(); switch(childType) { @@ -357,6 +458,15 @@ public class FluoQueryMetadataDAO { final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId); builder.addStatementPatternBuilder(spBuilder); break; + + case AGGREGATION: + // Add this node's metadata. + final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId); + builder.addAggregateMetadata(aggregationBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId()); + break; } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 2128700..562470a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; @@ -29,32 +30,40 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.algebra.AggregateOperator; +import org.openrdf.query.algebra.Extension; import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Group; +import org.openrdf.query.algebra.GroupElem; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.QueryModelNode; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import org.openrdf.query.parser.ParsedQuery; -import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import net.jcip.annotations.Immutable; + /** * Creates the {@link FluoQuery} metadata that is required by the Fluo * application to process a SPARQL query. @@ -119,7 +128,7 @@ public class SparqlFluoQueryBuilder { */ public Optional<String> getId(final QueryModelNode node) { checkNotNull(node); - return Optional.fromNullable( nodeIds.get(node) ); + return Optional.ofNullable( nodeIds.get(node) ); } /** @@ -157,14 +166,15 @@ public class SparqlFluoQueryBuilder { prefix = JOIN_PREFIX; } else if(node instanceof Projection) { prefix = QUERY_PREFIX; + } else if(node instanceof Extension) { + prefix = AGGREGATION_PREFIX; } else { - throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Projection} but was " + node.getClass()); + throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass()); } // Create the unique portion of the id. final String unique = UUID.randomUUID().toString().replaceAll("-", ""); - // Put them together to create the Node ID. return prefix + "_" + unique; } @@ -204,6 +214,77 @@ public class SparqlFluoQueryBuilder { this.nodeIds = checkNotNull(nodeIds); } + /** + * If we encounter an Extension node that contains a Group, then we've found an aggregation. + */ + @Override + public void meet(final Extension node) { + final TupleExpr arg = node.getArg(); + if(arg instanceof Group) { + final Group group = (Group) arg; + + // Get the Aggregation Node's id. + final String aggregationId = nodeIds.getOrMakeId(node); + + // Get the group's child node id. This call forces it to be a supported child type. + final TupleExpr child = group.getArg(); + final String childNodeId = nodeIds.getOrMakeId( child ); + + // Get the list of group by binding names. + VariableOrder groupByVariableOrder = null; + if(!group.getGroupBindingNames().isEmpty()) { + groupByVariableOrder = new VariableOrder(group.getGroupBindingNames()); + } else { + groupByVariableOrder = new VariableOrder(); + } + + // The aggregations that need to be performed are the Group Elements. + final List<AggregationElement> aggregations = new ArrayList<>(); + for(final GroupElem groupElem : group.getGroupElements()) { + // Figure out the type of the aggregation. + final AggregateOperator operator = groupElem.getOperator(); + final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() ); + + // If the type is one we support, create the AggregationElement. + if(type.isPresent()) { + final String resultBindingName = groupElem.getName(); + + final AtomicReference<String> aggregatedBindingName = new AtomicReference<>(); + groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(final Var node) { + aggregatedBindingName.set( node.getName() ); + } + }); + + aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) ); + } + } + + // Update the aggregation's metadata. + AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(aggregationId).orNull(); + if(aggregationBuilder == null) { + aggregationBuilder = AggregationMetadata.builder(aggregationId); + fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); + } + + aggregationBuilder.setChildNodeId(childNodeId); + aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder); + for(final AggregationElement aggregation : aggregations) { + aggregationBuilder.addAggregation(aggregation); + } + + // Update the child node's metadata. + final Set<String> childVars = getVars(child); + final VariableOrder childVarOrder = new VariableOrder(childVars); + + setChildMetadata(childNodeId, childVarOrder, aggregationId); + } + + // Walk to the next node. + super.meet(node); + } + @Override public void meet(final StatementPattern node) { // Extract metadata that will be stored from the node. @@ -386,10 +467,21 @@ public class SparqlFluoQueryBuilder { filterBuilder.setParentNodeId(parentNodeId); break; - case QUERY: - throw new IllegalArgumentException("QUERY nodes do not have children."); - default: - throw new IllegalArgumentException("Unsupported NodeType: " + childType); + case AGGREGATION: + AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); + if(aggregationBuilder == null) { + aggregationBuilder = AggregationMetadata.builder(childNodeId); + fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); + } + + aggregationBuilder.setVariableOrder(childVarOrder); + aggregationBuilder.setParentNodeId(parentNodeId); + break; + + case QUERY: + throw new IllegalArgumentException("QUERY nodes do not have children."); + default: + throw new IllegalArgumentException("Unsupported NodeType: " + childType); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java new file mode 100644 index 0000000..30f026c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingSetUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +/** + * A utility class that defines functions that make it easier to work with {@link BindingSet} objects. + */ +public class BindingSetUtil { + + /** + * Create a new {@link BindingSet} that only includes the bindings whose names appear within the {@code variableOrder}. + * If no binding is found for a variable, then that binding is just omitted from the resulting object. + * + * @param variableOrder - Defines which bindings will be kept. (not null) + * @param bindingSet - Contains the source {@link Binding}s. (not null) + * @return A new {@link BindingSet} containing only the specified bindings. + */ + public static BindingSet keepBindings(final VariableOrder variableOrder, final BindingSet bindingSet) { + requireNonNull(variableOrder); + requireNonNull(bindingSet); + + final MapBindingSet result = new MapBindingSet(); + for(final String bindingName : variableOrder) { + if(bindingSet.hasBinding(bindingName)) { + final Binding binding = bindingSet.getBinding(bindingName); + result.addBinding(binding); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java new file mode 100644 index 0000000..ffb2320 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/RowKeyUtil.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Charsets; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * TODO doc that this implements utility functions used to create the Fluo Row Keys used when referencing the binding + * set results of a query node. + */ +@DefaultAnnotation(NonNull.class) +public class RowKeyUtil { + + private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter(); + + /** + * Creates the Row Key that will be used by a node within the PCJ Fluo application to represent where a specific + * result of that node will be placed. + * + * @param nodeId - Identifies the Node that the Row Key is for. (not null) + * @param variableOrder - Specifies which bindings from {@code bindingSet} will be included within the Row Key as + * well as the order they will appear. (not null) + * @param bindingSet - The Binding Set whose values will be used to create the Row Key. (not null) + * @return A Row Key built using the provided values. + */ + public static Bytes makeRowKey(final String nodeId, final VariableOrder variableOrder, final BindingSet bindingSet) { + requireNonNull(nodeId); + requireNonNull(variableOrder); + requireNonNull(bindingSet); + + // The Row Key starts with the Node ID of the node the result belongs to. + String rowId = nodeId + IncrementalUpdateConstants.NODEID_BS_DELIM; + + // Append the String formatted bindings that are included in the Variable Order. The Variable Order also defines + // the order the binding will be written to the Row Key. If a Binding is missing for one of the Binding Names + // that appears within the Variable Order, then an empty value will be written for that location within the Row Key. + rowId += BS_CONVERTER.convert(bindingSet, variableOrder); + + // Format the Row Key as a UTF 8 encoded Bytes object. + return Bytes.of( rowId.getBytes(Charsets.UTF_8) ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java new file mode 100644 index 0000000..99791ee --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.indexing.pcj.fluo.app; + +import static org.junit.Assert.assertEquals; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Tests the methods of {@link VisibilityBindingSetSerDe}. + */ +public class VisibilityBindingSetSerDeTest { + + @Test + public void rountTrip() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", vf.createLiteral("Alice")); + bs.addBinding("age", vf.createLiteral(5)); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u"); + + final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe(); + final Bytes bytes = serde.serialize(original); + final VisibilityBindingSet result = serde.deserialize(bytes); + + assertEquals(original, result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java index 43dac3c..854798d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -135,7 +135,7 @@ public class NewQueryCommand implements PcjAdminClientCommand { // Tell the Fluo PCJ Updater app to maintain the PCJ. createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, accumulo, ryaTablePrefix); - } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { + } catch (MalformedQueryException | PcjException | RyaDAOException e) { throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 105f697..c8dc737 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -18,19 +18,29 @@ */ package org.apache.rya.indexing.pcj.fluo.demo; +import java.io.IOException; import java.util.Set; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.commons.lang3.StringUtils; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.mini.MiniFluo; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.resolver.RyaToRdfConversions; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.rdftriplestore.RyaSailRepository; import org.openrdf.model.Statement; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; @@ -45,16 +55,6 @@ import org.openrdf.sail.SailException; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.rdftriplestore.RyaSailRepository; - /** * Demonstrates historicly added Rya statements that are stored within the core * Rya tables joining with newly streamed statements into the Fluo application. @@ -181,7 +181,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { // Tell the Fluo app to maintain it. new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix); - } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { + } catch (MalformedQueryException | PcjException | RyaDAOException e) { throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); } @@ -192,11 +192,11 @@ public class FluoAndHistoricPcjsDemo implements Demo { // 5. Show that the Fluo app exported the results to the PCJ table in Accumulo. log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:"); - try { - for(final BindingSet result : pcjStorage.listResults(pcjId)) { - log.info(" " + result); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + log.info(" " + resultsIt.next()); } - } catch (final PCJStorageException e) { + } catch (final Exception e) { throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e); } waitForEnter(); @@ -257,11 +257,11 @@ public class FluoAndHistoricPcjsDemo implements Demo { // 8. Show the new results have been exported to the PCJ table in Accumulo. log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:"); - try { - for(final BindingSet result : pcjStorage.listResults(pcjId)) { - log.info(" " + result); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + log.info(" " + resultsIt.next()); } - } catch (final PCJStorageException e) { + } catch (final Exception e) { throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e); } log.info(""); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index ab99ecd..9263362 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -41,6 +41,11 @@ <groupId>org.apache.rya</groupId> <artifactId>rya.indexing</artifactId> </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + <!-- Testing dependencies. --> <dependency> <groupId>org.apache.fluo</groupId> @@ -86,8 +91,10 @@ </exclusion> </exclusions> </dependency> - - - + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java deleted file mode 100644 index 6e696c8..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.accumulo.minicluster.MiniAccumuloConfig; -import org.apache.fluo.api.client.FluoAdmin; -import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; -import org.apache.fluo.api.client.FluoAdmin.TableExistsException; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.rya.sail.config.RyaSailFactory; -import org.apache.zookeeper.ClientCnxn; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.openrdf.model.Statement; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.MapBindingSet; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.sail.Sail; - -import com.google.common.io.Files; - -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; - -/** - * Integration tests that ensure the Fluo application processes PCJs results - * correctly. - * <p> - * This class is being ignored because it doesn't contain any unit tests. - */ -public abstract class ITBase { - private static final Logger log = Logger.getLogger(ITBase.class); - - // Rya data store and connections. - protected static final String RYA_INSTANCE_NAME = "demo_"; - protected RyaSailRepository ryaRepo = null; - protected RepositoryConnection ryaConn = null; - - // Mini Accumulo Cluster - protected static final String ACCUMULO_USER = "root"; - protected static final String ACCUMULO_PASSWORD = "password"; - protected MiniAccumuloCluster cluster; - protected static Connector accumuloConn = null; - protected String instanceName = null; - protected String zookeepers = null; - - // Fluo data store and connections. - protected static final String FLUO_APP_NAME = "IntegrationTests"; - protected MiniFluo fluo = null; - protected FluoClient fluoClient = null; - - @BeforeClass - public static void killLoudLogs() { - Logger.getRootLogger().setLevel(Level.ERROR); - Logger.getLogger(ClientCnxn.class).setLevel(Level.OFF); - } - - @Before - public void setupMiniResources() throws Exception { - // Will set defaults for log4J - org.apache.log4j.BasicConfigurator.configure(); - // Initialize the Mini Accumulo that will be used to host Rya and Fluo. - setupMiniAccumulo(); - - // Initialize the Mini Fluo that will be used to store created queries. - fluo = startMiniFluo(); - fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); - - // Initialize the Rya that will be used by the tests. - ryaRepo = setupRya(instanceName, zookeepers); - ryaConn = ryaRepo.getConnection(); - } - - @After - public void shutdownMiniResources() { - if (ryaConn != null) { - try { - log.info("Shutting down Rya Connection."); - ryaConn.close(); - log.info("Rya Connection shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Rya Connection.", e); - } - } - - if (ryaRepo != null) { - try { - log.info("Shutting down Rya Repo."); - ryaRepo.shutDown(); - log.info("Rya Repo shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Rya Repo.", e); - } - } - - if (fluoClient != null) { - try { - log.info("Shutting down Fluo Client."); - fluoClient.close(); - log.info("Fluo Client shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Fluo Client.", e); - } - } - - if (fluo != null) { - try { - log.info("Shutting down Mini Fluo."); - fluo.close(); - log.info("Mini Fluo shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Mini Fluo.", e); - } - } - - if(cluster != null) { - try { - log.info("Shutting down the Mini Accumulo being used as a Rya store."); - cluster.stop(); - log.info("Mini Accumulo being used as a Rya store shut down."); - } catch(final Exception e) { - log.error("Could not shut down the Mini Accumulo.", e); - } - } - } - - /** - * A helper fuction for creating a {@link BindingSet} from an array of - * {@link Binding}s. - * - * @param bindings - The bindings to include in the set. (not null) - * @return A {@link BindingSet} holding the bindings. - */ - protected static BindingSet makeBindingSet(final Binding... bindings) { - final MapBindingSet bindingSet = new MapBindingSet(); - for (final Binding binding : bindings) { - bindingSet.addBinding(binding); - } - return bindingSet; - } - - /** - * A helper function for creating a {@link RyaStatement} that represents a - * Triple. - * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) - * @return A Triple as a {@link RyaStatement}. - */ - protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) { - checkNotNull(subject); - checkNotNull(predicate); - checkNotNull(object); - - final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject)) - .setPredicate(new RyaURI(predicate)); - - if (object.startsWith("http://") || object.startsWith("tag:")) { - builder.setObject(new RyaURI(object)); - } else { - builder.setObject(new RyaType(object)); - } - - return builder.build(); - } - - /** - * A helper function for creating a {@link RyaStatement} that represents a Triple. - * This overload takes a typed literal for the object. Prepare it like this for example specify the type (wktLiteral) and the value (Point...): - * makeRyaStatement(s, p, new RyaType(new URIImpl("http://www.opengis.net/ont/geosparql#wktLiteral"), "Point(-77.03524 38.889468)")) // - * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) - * @return A Triple as a {@link RyaStatement}. - */ - protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final RyaType object) { - checkNotNull(subject); - checkNotNull(predicate); - checkNotNull(object); - - final RyaStatementBuilder builder = RyaStatement.builder()// - .setSubject(new RyaURI(subject))// - .setPredicate(new RyaURI(predicate))// - .setObject(object); - return builder.build(); - } - - /** - * A helper function for creating a {@link RyaStatement} that represents a Triple with an integer. - * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple, an integer value (int). - * @return A Triple as a {@link RyaStatement}. - */ - protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) { - checkNotNull(subject); - checkNotNull(predicate); - - return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate)) - .setObject(new RyaType(XMLSchema.INT, "" + object)).build(); - } - - /** - * A helper function for creating a Sesame {@link Statement} that represents - * a Triple.. - * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) - * @return A Triple as a {@link Statement}. - */ - protected static Statement makeStatement(final String subject, final String predicate, final String object) { - checkNotNull(subject); - checkNotNull(predicate); - checkNotNull(object); - - final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, object); - return RyaToRdfConversions.convertStatement(ryaStmt); - } - - /** - * Fetches the binding sets that are the results of a specific SPARQL query from the Fluo table. - * - * @param fluoClient- A connection to the Fluo table where the results reside. (not null) - * @param sparql - This query's results will be fetched. (not null) - * @return The binding sets for the query's results. - */ - protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) { - final Set<BindingSet> bindingSets = new HashSet<>(); - - try (Snapshot snapshot = fluoClient.newSnapshot()) { - final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString(); - - // Fetch the query's variable order. - final QueryMetadata queryMetadata = new FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId); - final VariableOrder varOrder = queryMetadata.getVariableOrder(); - - CellScanner cellScanner = snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build(); - final BindingSetStringConverter converter = new BindingSetStringConverter(); - - Iterator<RowColumnValue> iter = cellScanner.iterator(); - - while (iter.hasNext()) { - final String bindingSetString = iter.next().getsValue(); - final BindingSet bindingSet = converter.convert(bindingSetString, varOrder); - bindingSets.add(bindingSet); - } - } - - return bindingSets; - } - - private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - final File miniDataDir = Files.createTempDir(); - - // Setup and start the Mini Accumulo. - final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD); - cluster = new MiniAccumuloCluster(cfg); - cluster.start(); - - // Store a connector to the Mini Accumulo. - instanceName = cluster.getInstanceName(); - zookeepers = cluster.getZooKeepers(); - - final Instance instance = new ZooKeeperInstance(instanceName, zookeepers); - accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD)); - } - - /** - * Sets up a Rya instance. - */ - protected static RyaSailRepository setupRya(final String instanceName, final String zookeepers) throws Exception { - checkNotNull(instanceName); - checkNotNull(zookeepers); - - // Install the Rya instance to the mini accumulo cluster. - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), accumuloConn); - - ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() - .setEnableTableHashPrefix(false) - .setEnableFreeTextIndex(true) - .setEnableEntityCentricIndex(true) - .setEnableGeoIndex(true) - .setEnableTemporalIndex(true) - .setEnablePcjIndex(true) - .setFluoPcjAppName(FLUO_APP_NAME) - .build()); - - // Connect to the Rya instance that was just installed. - final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); - final Sail sail = RyaSailFactory.getInstance(conf); - final RyaSailRepository ryaRepo = new RyaSailRepository(sail); - return ryaRepo; - } - - protected static AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_INSTANCE_NAME); - // Accumulo connection information. - conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); - conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, ""); - // PCJ configuration information. - conf.set(ConfigUtils.USE_PCJ, "true"); - conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); - conf.set(ConfigUtils.FLUO_APP_NAME, FLUO_APP_NAME); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - - return conf; - } - - protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { - // Setup the observers that will be used by the Fluo PCJ Application. - final List<ObserverSpecification> observers = new ArrayList<>(); - observers.add(new ObserverSpecification(TripleObserver.class.getName())); - observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); - observers.add(new ObserverSpecification(JoinObserver.class.getName())); - observers.add(new ObserverSpecification(FilterObserver.class.getName())); - - // Set export details for exporting from Fluo to a Rya repository and a subscriber queue. - final HashMap<String, String> exportParams = new HashMap<>(); - setExportParameters(exportParams); - - // Configure the export observer to export new PCJ results to the mini accumulo cluster. - final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); - observers.add(exportObserverConfig); - - // Configure how the mini fluo will run. - final FluoConfiguration config = new FluoConfiguration(); - config.setMiniStartAccumulo(false); - config.setAccumuloInstance(instanceName); - config.setAccumuloUser(ACCUMULO_USER); - config.setAccumuloPassword(ACCUMULO_PASSWORD); - config.setInstanceZookeepers(zookeepers + "/fluo"); - config.setAccumuloZookeepers(zookeepers); - - config.setApplicationName(FLUO_APP_NAME); - config.setAccumuloTable("fluo" + FLUO_APP_NAME); - - config.addObservers(observers); - - FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) ); - return FluoFactory.newMiniFluo(config); - } - - /** - * Set export details for exporting from Fluo to a Rya repository and a subscriber queue. - * Override this if you have custom export destinations. - * - * @param exportParams - */ - protected void setExportParameters(final HashMap<String, String> exportParams) { - final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); - ryaParams.setAccumuloInstanceName(instanceName); - ryaParams.setZookeeperServers(zookeepers); - ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); - ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java new file mode 100644 index 0000000..cd84cb9 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo; + +import static org.junit.Assert.assertEquals; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.sail.Sail; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +/** + * The base Integration Test class used for Fluo applications that export to a Kakfa topic. + */ +public class KafkaExportITBase extends AccumuloExportITBase { + + protected static final String RYA_INSTANCE_NAME = "test_"; + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private ZkUtils zkUtils; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + // The Rya instance statements are written to that will be fed into the Fluo app. + private RyaSailRepository ryaSailRepo = null; + + /** + * Add info about the Kafka queue/topic to receive the export. + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) + */ + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(true); + + // Configure the Kafka Producer + final Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + kafkaParams.addAllProducerConfig(producerConfig); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); + observers.add(exportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + + /** + * setup mini kafka and call the super to setup mini fluo + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources() + */ + @Before + public void setupKafka() throws Exception { + // Install an instance of Rya on the Accumulo cluster. + installRyaInstance(); + + // Setup Kafka. + zkServer = new EmbeddedZookeeper(); + final String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + final Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + } + + @After + public void teardownRya() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Uninstall the instance of Rya. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); + + // Shutdown the repo. + ryaSailRepo.shutDown(); + } + + private void installRyaInstance() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Install the Rya instance to the mini accumulo cluster. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableFreeTextIndex(false) + .setEnableEntityCentricIndex(false) + .setEnableGeoIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(true) + .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() ) + .build()); + + // Connect to the Rya instance that was just installed. + final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); + final Sail sail = RyaSailFactory.getInstance(conf); + ryaSailRepo = new RyaSailRepository(sail); + } + + protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + + // Accumulo connection information. + conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); + conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); + conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); + conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); + conf.setAuths(""); + + + // PCJ configuration information. + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + + conf.setDisplayQueryPlan(true); + + return conf; + } + + /** + * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into. + */ + protected RyaSailRepository getRyaSailRepository() throws Exception { + return ryaSailRepo; + } + + /** + * Close all the Kafka mini server and mini-zookeeper + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() + */ + @After + public void teardownKafka() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + /** + * Test kafka without rya code to make sure kafka works in this environment. + * If this test fails then its a testing environment issue, not with Rya. + * Source: https://github.com/asmaier/mini-kafka + */ + @Test + public void embeddedKafkaTest() throws Exception { + // create topic + final String topic = "testTopic"; + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + // setup producer + final Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps); + + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty("group.id", "group0"); + consumerProps.setProperty("client.id", "consumer0"); + consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put("auto.offset.reset", "earliest"); + + final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(topic)); + + // send message + final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8)); + producer.send(data); + producer.close(); + + // starting consumer + final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); + assertEquals(1, records.count()); + final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); + final ConsumerRecord<Integer, byte[]> record = recordIterator.next(); + assertEquals(42, (int) record.key()); + assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + consumer.close(); + } + + protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) { + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java new file mode 100644 index 0000000..5fe999f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.openrdf.sail.Sail; + +/** + * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index. + */ +public class RyaExportITBase extends AccumuloExportITBase { + + protected static final String RYA_INSTANCE_NAME = "test_"; + + private RyaSailRepository ryaSailRepo = null; + + public RyaExportITBase() { + // Indicates that MiniFluo should be started before each test. + super(true); + } + + @BeforeClass + public static void setupLogging() { + BasicConfigurator.configure(); + Logger.getRootLogger().setLevel(Level.ERROR); + } + + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); + ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); + ryaParams.setExporterUsername(ACCUMULO_USER); + ryaParams.setExporterPassword(ACCUMULO_PASSWORD); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); + observers.add(exportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + + @Before + public void setupRya() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Install the Rya instance to the mini accumulo cluster. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableFreeTextIndex(false) + .setEnableEntityCentricIndex(false) + .setEnableGeoIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(true) + .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() ) + .build()); + + // Connect to the Rya instance that was just installed. + final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); + final Sail sail = RyaSailFactory.getInstance(conf); + ryaSailRepo = new RyaSailRepository(sail); + } + + @After + public void teardownRya() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Uninstall the instance of Rya. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); + + // Shutdown the repo. + ryaSailRepo.shutDown(); + } + + /** + * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into. + */ + protected RyaSailRepository getRyaSailRepository() throws Exception { + return ryaSailRepo; + } + + protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + + // Accumulo connection information. + conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); + conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); + conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); + conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); + conf.setAuths(""); + + // PCJ configuration information. + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + + conf.setDisplayQueryPlan(true); + + return conf; + } +} \ No newline at end of file