http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java new file mode 100644 index 0000000..df59cb5 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.BindingSet; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Transaction; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.data.Span; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.TypedTransaction; + +/** + * Deletes a Pre-computed Join (PCJ) from Fluo. + * <p> + * This is a two phase process. + * <ol> + * <li>Delete metadata about each node of the query using a single Fluo + * transaction. This prevents new {@link BindingSet}s from being created when + * new triples are inserted.</li> + * <li>Delete BindingSets associated with each node of the query. This is done + * in a batch fashion to guard against large delete transactions that don't fit + * into memory.</li> + * </ol> + */ +@ParametersAreNonnullByDefault +public class DeletePcj { + + private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + private final int batchSize; + + /** + * Constructs an instance of {@link DeletePcj}. + * + * @param batchSize - The number of entries that will be deleted at a time. (> 0) + */ + public DeletePcj(final int batchSize) { + checkArgument(batchSize > 0); + this.batchSize = batchSize; + } + + /** + * Deletes all metadata and {@link BindingSet}s associated with a Rya + * Precomputed Join Index from the Fluo application that is incrementally + * updating it. + * + * @param client - Connects to the Fluo application that is updating the PCJ Index. (not null) + * @param pcjId - The PCJ ID for the query that will removed from the Fluo application. (not null) + */ + public void deletePcj(final FluoClient client, final String pcjId) { + requireNonNull(client); + requireNonNull(pcjId); + + final Transaction tx = client.newTransaction(); + + // Delete the query's metadata. This halts input. + final List<String> nodeIds = getNodeIds(tx, pcjId); + deleteMetadata(tx, nodeIds, pcjId); + + // Delete the binding sets associated with the query's nodes. + for (final String nodeId : nodeIds) { + deleteData(client, nodeId); + } + } + + /** + * This method retrieves all of the nodeIds that are part of the query with + * specified pcjId. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param pcjId - Id of query. (not null) + * @return list of Node IDs associated with the query {@code pcjId}. + */ + private List<String> getNodeIds(Transaction tx, String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + // Get the ID that tracks the query within the Fluo application. + final String queryId = getQueryIdFromPcjId(tx, pcjId); + + // Get the query's children nodes. + final List<String> nodeIds = new ArrayList<>(); + nodeIds.add(queryId); + getChildNodeIds(tx, queryId, nodeIds); + return nodeIds; + } + + /** + * Recursively navigate query tree to extract all of the nodeIds. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param nodeId - Current node in query tree. (not null) + * @param nodeIds - The Node IDs extracted from query tree. (not null) + */ + private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) { + requireNonNull(tx); + requireNonNull(nodeId); + requireNonNull(nodeIds); + + final NodeType type = NodeType.fromNodeId(nodeId).get(); + switch (type) { + case QUERY: + final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId); + final String queryChild = queryMeta.getChildNodeId(); + nodeIds.add(queryChild); + getChildNodeIds(tx, queryChild, nodeIds); + break; + case JOIN: + final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId); + final String lchild = joinMeta.getLeftChildNodeId(); + final String rchild = joinMeta.getRightChildNodeId(); + nodeIds.add(lchild); + nodeIds.add(rchild); + getChildNodeIds(tx, lchild, nodeIds); + getChildNodeIds(tx, rchild, nodeIds); + break; + case FILTER: + final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId); + final String filterChild = filterMeta.getChildNodeId(); + nodeIds.add(filterChild); + getChildNodeIds(tx, filterChild, nodeIds); + break; + case STATEMENT_PATTERN: + break; + } + } + + /** + * Deletes metadata for all nodeIds associated with a given queryId in a + * single transaction. Prevents additional BindingSets from being created as + * new triples are added. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param nodeIds - Nodes whose metatdata will be deleted. (not null) + * @param pcjId - The PCJ ID of the query whose will be deleted. (not null) + */ + private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) { + requireNonNull(tx); + requireNonNull(nodeIds); + requireNonNull(pcjId); + + try (final TypedTransaction typeTx = new StringTypeLayer().wrap(tx)) { + deletePcjIdAndSparqlMetadata(typeTx, pcjId); + + for (final String nodeId : nodeIds) { + final NodeType type = NodeType.fromNodeId(nodeId).get(); + deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns()); + } + typeTx.commit(); + } + } + + /** + * Deletes all metadata for a Query Node. + * + * @param tx - Transaction the deletes will be performed with. (not null) + * @param nodeId - The Node ID of the query node to delete. (not null) + * @param columns - The columns that will be deleted. (not null) + */ + private void deleteMetadataColumns(final TypedTransaction tx, final String nodeId, final List<Column> columns) { + requireNonNull(tx); + requireNonNull(columns); + requireNonNull(nodeId); + + final Bytes row = Bytes.of(nodeId); + for (final Column column : columns) { + tx.delete(row, column); + } + } + + + /** + * Deletes high level query meta for converting from queryId to pcjId and vice + * versa, as well as converting from sparql to queryId. + * + * @param tx - Transaction the deletes will be performed with. (not null) + * @param pcjId - The PCJ whose metadata will be deleted. (not null) + */ + private void deletePcjIdAndSparqlMetadata(final TypedTransaction tx, final String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + final String queryId = getQueryIdFromPcjId(tx, pcjId); + final String sparql = getSparqlFromQueryId(tx, queryId); + tx.delete(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID); + tx.delete(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); + tx.delete(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); + } + + + /** + * Deletes all BindingSets associated with the specified nodeId. + * + * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null) + * @param client - Used to delete the data. (not null) + */ + private void deleteData(final FluoClient client, final String nodeId) { + requireNonNull(client); + requireNonNull(nodeId); + + final NodeType type = NodeType.fromNodeId(nodeId).get(); + Transaction tx = client.newTransaction(); + while(deleteDataBatch(tx, getIterator(tx, nodeId, type.getBsColumn()), type.getBsColumn())) { + tx = client.newTransaction(); + } + } + + private RowIterator getIterator(final Transaction tx, final String nodeId, final Column column) { + requireNonNull(tx); + requireNonNull(nodeId); + requireNonNull(column); + + ScannerConfiguration sc1 = new ScannerConfiguration(); + sc1.fetchColumn(column.getFamily(), column.getQualifier()); + sc1.setSpan(Span.prefix(Bytes.of(nodeId))); + return tx.get(sc1); + } + + private boolean deleteDataBatch(final Transaction tx, final RowIterator iter, final Column column) { + requireNonNull(tx); + requireNonNull(iter); + requireNonNull(column); + + try (final TypedTransaction typeTx = new StringTypeLayer().wrap(tx)) { + int count = 0; + while (iter.hasNext() && count < batchSize) { + final Bytes row = iter.next().getKey(); + count++; + tx.delete(row, column); + } + + final boolean hasNext = iter.hasNext(); + tx.commit(); + return hasNext; + } + } + + private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); + return queryIdBytes.toString(); + } + + private String getSparqlFromQueryId(final Transaction tx, final String queryId) { + requireNonNull(tx); + requireNonNull(queryId); + + final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); + return metadata.getSparql(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java index e904afa..7a0a953 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java @@ -18,18 +18,17 @@ */ package org.apache.rya.indexing.pcj.fluo.api; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.accumulo.core.client.Connector; import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import io.fluo.api.client.FluoClient; import io.fluo.api.data.Bytes; @@ -46,7 +45,7 @@ public class GetPcjMetadata { * Get the {@link PcjMetadata} of all queries that are being maintained by * the Fluo app. * - * @param accumulo - The Accumulo instance that will be searched. (not null) + * @param accumulo - The PCJ Storage that will be searched. (not null) * @param fluo - The Fluo instance that will be searched. (not null) * @return A map where the query ID is the key and its metadata is the value. * @throws NotInFluoException A query Id does not have a PCJ export able @@ -54,15 +53,15 @@ public class GetPcjMetadata { * @throws NotInAccumuloException A PCJ export table that was found either * does not exist in Accumulo or it is not a PCJ table. */ - public Map<String, PcjMetadata> getMetadata(final Connector accumulo, final FluoClient fluo) throws NotInFluoException, NotInAccumuloException { - checkNotNull(accumulo); - checkNotNull(fluo); + public Map<String, PcjMetadata> getMetadata(final PrecomputedJoinStorage pcjStorage, final FluoClient fluo) throws NotInFluoException, NotInAccumuloException { + requireNonNull(pcjStorage); + requireNonNull(fluo); final Map<String, PcjMetadata> metadata = new HashMap<>(); final Collection<String> queryIds = listQueryIds.listQueryIds(fluo); for(final String queryId : queryIds) { - metadata.put(queryId, getMetadata(accumulo, fluo, queryId)); + metadata.put(queryId, getMetadata(pcjStorage, fluo, queryId)); } return metadata; @@ -72,7 +71,7 @@ public class GetPcjMetadata { * Get the {@link PcjMetadata} of a query that is being maintained by the * Fluo app. * - * @param accumulo - The Accumulo instance that will be searched. (not null) + * @param pcjStorage - The PCJ Storage that will be searched. (not null) * @param fluo - The Fluo instance that will be searched. (not null) * @param queryId - The Query Id whose metadata will be fetched. (not null) * @return The {@link PcjMetadata} of the query. @@ -81,28 +80,28 @@ public class GetPcjMetadata { * @throws NotInAccumuloException The PCJ export table that was found either * does not exist in Accumulo or it is not a PCJ table. */ - public PcjMetadata getMetadata(final Connector accumulo, final FluoClient fluo, final String queryId) throws NotInFluoException, NotInAccumuloException { - checkNotNull(accumulo); - checkNotNull(fluo); - checkNotNull(queryId); + public PcjMetadata getMetadata(final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, final String queryId) throws NotInFluoException, NotInAccumuloException { + requireNonNull(pcjStorage); + requireNonNull(fluo); + requireNonNull(queryId); - // Lookup the Accumulo export table name in the Fluo table. - String pcjTableName = null; + // Lookup the Rya PCJ ID associated with the query. + String pcjId = null; try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() ) ) { - final Bytes pcjTableNameBytes = snap.get(Bytes.of(queryId), FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME); - if(pcjTableNameBytes == null) { + final Bytes pcjIdBytes = snap.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID); + if(pcjIdBytes == null) { throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId + - "' because a PCJ export table name was not stored in the Fluo table."); + "' because a Rya PCJ ID not stored in the Fluo table."); } - pcjTableName = pcjTableNameBytes.toString(); + pcjId = pcjIdBytes.toString(); } - // Fetch the metadata from the Accumulo table. + // Fetch the metadata from the storage. try { - return new PcjTables().getPcjMetadata(accumulo, pcjTableName); + return pcjStorage.getPcjMetadata(pcjId); } catch (final PcjException e) { throw new NotInAccumuloException("Could not get the PcjMetadata for queryId '" + queryId + - "' because the metadata was missing from the Accumulo table.", e); + "' because the metadata was missing from the Rya storage.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java index 092ad9c..2a53a0e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java @@ -59,6 +59,8 @@ public class InsertTriples { private static final Encoder ENCODER = new StringEncoder(); + // TODO visiblity is part of RyaStatement. Put it there instead. + /** * Inserts a triple into Fluo. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index 54001b1..9c7f672 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -42,6 +42,12 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.indexing.pcj</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> </dependency> <!-- 3rd Party Runtime Dependencies. --> @@ -49,6 +55,16 @@ under the License. <groupId>io.fluo</groupId> <artifactId>fluo-api</artifactId> </dependency> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-core</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> <!-- Testing dependencies. --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java index e9f6243..c6ad31e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/NodeType.java @@ -18,22 +18,62 @@ */ package org.apache.rya.indexing.pcj.fluo.app; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QueryNodeMetadataColumns; +import org.openrdf.query.BindingSet; + import com.google.common.base.Optional; +import io.fluo.api.data.Column; + /** * Represents the different types of nodes that a Query may have. */ public enum NodeType { - FILTER, - JOIN, - STATEMENT_PATTERN, - QUERY; + FILTER (QueryNodeMetadataColumns.FILTER_COLUMNS, FluoQueryColumns.FILTER_BINDING_SET), + JOIN(QueryNodeMetadataColumns.JOIN_COLUMNS, FluoQueryColumns.JOIN_BINDING_SET), + STATEMENT_PATTERN(QueryNodeMetadataColumns.STATEMENTPATTERN_COLUMNS, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET), + QUERY(QueryNodeMetadataColumns.QUERY_COLUMNS, FluoQueryColumns.QUERY_BINDING_SET); + + //Metadata Columns associated with given NodeType + private QueryNodeMetadataColumns metadataColumns; + + //Column where BindingSet results are stored for given NodeType + private Column bindingSetColumn; + + /** + * Constructs an instance of {@link NodeType}. + * + * @param metadataColumns - Metadata {@link Column}s associated with this {@link NodeType}. (not null) + * @param bindingSetColumn - The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. (not null) + */ + private NodeType(QueryNodeMetadataColumns metadataColumns, Column bindingSetColumn) { + this.metadataColumns = requireNonNull(metadataColumns); + this.bindingSetColumn = requireNonNull(bindingSetColumn); + } + + /** + * @return Metadata {@link Column}s associated with this {@link NodeType}. + */ + public List<Column> getMetaDataColumns() { + return metadataColumns.columns(); + } + + + /** + * @return The {@link Column} used to store this {@link NodeType|'s {@link BindingSet}s. + */ + public Column getBsColumn() { + return bindingSetColumn; + } /** * Get the {@link NodeType} of a node based on its Node ID. @@ -43,7 +83,7 @@ public enum NodeType { * node's ID, otherwise absent. */ public static Optional<NodeType> fromNodeId(final String nodeId) { - checkNotNull(nodeId); + requireNonNull(nodeId); NodeType type = null; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java index 525f9c3..6f71a48 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java @@ -43,6 +43,8 @@ public class RyaExportParameters extends ParametersBase { public static final String CONF_EXPORTER_USERNAME = "pcj.fluo.export.rya.exporterUsername"; public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword"; + public static final String CONF_RYA_INSTANCE_NAME = "pcj.fluo.export.rya.ryaInstanceName"; + /** * Constructs an instance of {@link RyaExportParameters}. * @@ -124,6 +126,20 @@ public class RyaExportParameters extends ParametersBase { } /** + * @return The name of the Rya instance this application is updating. + */ + public Optional<String> getRyaInstanceName() { + return Optional.fromNullable( params.get(CONF_RYA_INSTANCE_NAME) ); + } + + /** + * @param ryaInstanceName - The name of the Rya instance this application is updating. + */ + public void setRyaInstanceName(@Nullable final String ryaInstanceName) { + params.put(CONF_RYA_INSTANCE_NAME, ryaInstanceName); + } + + /** * @return The password that will be used to export PCJ * results to the destination Accummulo table. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java index f9af15c..11245c0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java @@ -22,11 +22,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collections; -import org.apache.accumulo.core.client.Connector; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import io.fluo.api.data.Bytes; @@ -37,18 +36,15 @@ import io.fluo.api.types.TypedTransactionBase; */ public class RyaResultExporter implements IncrementalResultExporter { - private final Connector accumuloConn; - private final PcjTables pcjTables; + private final PrecomputedJoinStorage pcjStorage; /** * Constructs an instance of {@link RyaResultExporter}. * - * @param accumuloConn - A connection to the Accumulo instance that hosts Rya PCJ tables. (not null) - * @param pcjTables - A utility used to interact with Rya's PCJ tables. (not null) + * @param pcjStorage - The PCJ storage the new results will be exported to. (not null) */ - public RyaResultExporter(final Connector accumuloConn, final PcjTables pcjTables) { - this.accumuloConn = checkNotNull(accumuloConn); - this.pcjTables = checkNotNull(pcjTables); + public RyaResultExporter(final PrecomputedJoinStorage pcjStorage) { + this.pcjStorage = checkNotNull(pcjStorage); } @Override @@ -60,14 +56,12 @@ public class RyaResultExporter implements IncrementalResultExporter { checkNotNull(queryId); checkNotNull(result); - // Get the name of the table the PCJ results will be written to. - final String pcjTableName = fluoTx.get(Bytes.of(queryId), FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); + final String pcjId = fluoTx.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID).toString(); - // Write the result to the PCJ table. try { - pcjTables.addResults(accumuloConn, pcjTableName, Collections.singleton(result)); - } catch (final PcjException e) { - throw new ResultExportException("A result could not be exported to the PCJ table in Accumulo.", e); + pcjStorage.addResults(pcjId, Collections.singleton(result)); + } catch (final PCJStorageException e) { + throw new ResultExportException("A result could not be exported to Rya.", e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java index 6550f66..dfbb910 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java @@ -28,7 +28,8 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import com.google.common.base.Optional; @@ -47,16 +48,23 @@ public class RyaResultExporterFactory implements IncrementalResultExporterFactor final RyaExportParameters params = new RyaExportParameters( context.getParameters() ); if(params.isExportToRya()) { + // Setup Zookeeper connection info. final String accumuloInstance = params.getAccumuloInstanceName().get(); final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ","); final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers); try { + // Setup Accumulo connection info. final String exporterUsername = params.getExporterUsername().get(); final String exporterPassword = params.getExporterPassword().get(); final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword)); - final IncrementalResultExporter exporter = new RyaResultExporter(accumuloConn, new PcjTables()); + // Setup Rya PCJ Storage. + final String ryaInstanceName = params.getRyaInstanceName().get(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName); + + // Make the exporter. + final IncrementalResultExporter exporter = new RyaResultExporter(pcjStorage); return Optional.of(exporter); } catch (final AccumuloException | AccumuloSecurityException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index be24ac9..e90496a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -18,6 +18,13 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.List; + +import javax.annotation.ParametersAreNonnullByDefault; + import io.fluo.api.data.Column; /** @@ -26,32 +33,32 @@ import io.fluo.api.data.Column; * See the table bellow for information specific to each metadata model. * <p> * <b>Query Metadata</b> - * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> - * <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query..</td> </tr> + * <tr> <td>Node ID</td> <td>queryMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr> * <tr> <td>Node ID</td> <td>queryMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>queryMetadata:bindingSet</td> <td>A Binding Set that matches the query.</td> </tr> - * </table> + * </table> * </p> * <p> * <b>Filter Metadata</b> * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:veriableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:originalSparql</td> <td>The original SPRAQL query this filter was derived from.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> <td>Indicates which filter within the original SPARQL query this represents.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node ID this filter emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr> - * <tr> <td>Node ID + DELIM + Binding set String </td> <td>filterMetadata:bindingSet</td> <td>A Binding Set that matches the Filter.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding set String</td> <td>filterMetadata:bindingSet</td> <td>A Binding Set that matches the Filter.</td> </tr> * </table> - * <p> + * </p> * <p> * <b>Join Metadata</b> * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:nodeId</td> <td>The Node ID of the Join.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>joinMetadata:joinType</td> <td>The Join algorithm that will be used when computing join results.</td> </tr> @@ -60,11 +67,11 @@ import io.fluo.api.data.Column; * <tr> <td>Node ID</td> <td>joinMetadata:rightChildNodeId</td> <td>A Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID + DELIM + Binding set String</td> <td>joinMetadata:bindingSet</td> <td>A Binding Set that matches the Join.</td> </tr> * </table> - * <p> + * </p> * <p> * <b>Statement Pattern Metadata</b> * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>statementPatternMetadata:nodeId</td> <td>The Node ID of the Statement Pattern.</td> </tr> * <tr> <td>Node ID</td> <td>statementPatternMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> * <tr> <td>Node ID</td> <td>statementPatternMetadata:pattern</td> <td>The pattern that defines which Statements will be matched.</td> </tr> @@ -81,34 +88,44 @@ public class FluoQueryColumns { public static final String JOIN_METADATA_CF = "joinMetadata"; public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; - /** - * New triples that have been added to Rya are written as a row in this - * column so that any queries that include them in their results will be - * updated. - * <p> - * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr> - * </table> - */ - public static final Column TRIPLES = new Column("triples", "SPO"); - - /** - * Stores the name of the Accumulo table the query's results will be stored. - * The table's structure is defined by Rya's. - * </p> - * <table border="1" style="width:100%"> - * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> - * <tr> <td>Query ID</td> <td>query:ryaExportTableName</td> - * <td>The name of the Accumulo table the results will be exported to using - * the Rya PCJ table structure.</td> </tr> - * </table> - */ - public static final Column QUERY_RYA_EXPORT_TABLE_NAME = new Column("query", "ryaExportTableName"); - - - // Sparql to Query ID used to list all queries that are in the system. - public static final Column QUERY_ID = new Column("sparql", "queryId"); + /** + * New triples that have been added to Rya are written as a row in this + * column so that any queries that include them in their results will be + * updated. + * <p> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Core Rya SPO formatted triple</td> <td>triples:SPO</td> <td>visibility</td> </tr> + * </table> + * </p> + */ + public static final Column TRIPLES = new Column("triples", "SPO"); + + /** + * Stores the Rya assigned PCJ ID that the query's results reflect. This + * value defines where the results will be exported to. + * <p> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Query ID</td> <td>query:ryaPcjId</td> <td>Identifies which PCJ the reuslts of this query will be exported to.</td> </tr> + * </table> + * </p> + */ + public static final Column RYA_PCJ_ID = new Column("query", "ryaPcjId"); + + /** + * Associates a PCJ ID with a Query ID. This enables a quick lookup of the Query ID from the PCJ ID and is useful of Deleting PCJs. + * <p> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>PCJ ID</td> <td>ryaPcjId:queryId</td> <td>Identifies which Query ID is associated with the given PCJ ID.</td> </tr> + * </table> + * </p> + */ + public static final Column PCJ_ID_QUERY_ID = new Column("ryaPcjId", "queryId"); + + // Sparql to Query ID used to list all queries that are in the system. + public static final Column QUERY_ID = new Column("sparql", "queryId"); // Query Metadata columns. public static final Column QUERY_NODE_ID = new Column(QUERY_METADATA_CF, "nodeId"); @@ -141,4 +158,69 @@ public class FluoQueryColumns { public static final Column STATEMENT_PATTERN_PATTERN = new Column(STATEMENT_PATTERN_METADATA_CF, "pattern"); public static final Column STATEMENT_PATTERN_PARENT_NODE_ID = new Column(STATEMENT_PATTERN_METADATA_CF, "parentNodeId"); public static final Column STATEMENT_PATTERN_BINDING_SET = new Column(STATEMENT_PATTERN_METADATA_CF, "bindingSet"); + + /** + * Enumerates the {@link Column}s that hold all of the fields for each type + * of node that can compose a query. + */ + @ParametersAreNonnullByDefault + public enum QueryNodeMetadataColumns { + /** + * The columns a {@link QueryMetadata} object's fields are stored within. + */ + QUERY_COLUMNS( + Arrays.asList(QUERY_NODE_ID, + QUERY_VARIABLE_ORDER, + QUERY_SPARQL, + QUERY_CHILD_NODE_ID)), + + /** + * The columns a {@link FilterMetadata} object's fields are stored within. + */ + FILTER_COLUMNS( + Arrays.asList(FILTER_NODE_ID, + FILTER_VARIABLE_ORDER, + FILTER_ORIGINAL_SPARQL, + FILTER_INDEX_WITHIN_SPARQL, + FILTER_PARENT_NODE_ID, + FILTER_CHILD_NODE_ID)), + + /** + * The columns a {@link JoinMetadata} object's fields are stored within. + */ + JOIN_COLUMNS( + Arrays.asList(JOIN_NODE_ID, + JOIN_VARIABLE_ORDER, + JOIN_TYPE, + JOIN_PARENT_NODE_ID, + JOIN_LEFT_CHILD_NODE_ID, + JOIN_RIGHT_CHILD_NODE_ID)), + + /** + * The columns a {@link StatementPatternMetadata} object's fields are stored within. + */ + STATEMENTPATTERN_COLUMNS( + Arrays.asList(STATEMENT_PATTERN_NODE_ID, + STATEMENT_PATTERN_VARIABLE_ORDER, + STATEMENT_PATTERN_PATTERN, + STATEMENT_PATTERN_PARENT_NODE_ID)); + + private List<Column> columns; + + /** + * Constructs an instance of {@link QueryNodeMetadataColumns}. + * + * @param columns - The {@link Column}s associated with this node's metadata. (not null) + */ + private QueryNodeMetadataColumns(List<Column> columns) { + this.columns = requireNonNull(columns); + } + + /** + * @return The {@link Column}s associated with this node's metadata. + */ + public List<Column> columns() { + return columns; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index c3ff0a0..3adff7f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -163,6 +163,7 @@ public class SparqlFluoQueryBuilder { // Create the unique portion of the id. final String unique = UUID.randomUUID().toString().replaceAll("-", ""); + // Put them together to create the Node ID. return prefix + "_" + unique; } @@ -218,9 +219,9 @@ public class SparqlFluoQueryBuilder { } @Override - public void meet(LeftJoin node) { + public void meet(final LeftJoin node) { // Extract the metadata that will be stored for the node. - String leftJoinNodeId = nodeIds.getOrMakeId(node); + final String leftJoinNodeId = nodeIds.getOrMakeId(node); final QueryModelNode left = node.getLeftArg(); final QueryModelNode right = node.getRightArg(); @@ -245,7 +246,7 @@ public class SparqlFluoQueryBuilder { super.meet(node); } - private void makeJoinMetadata(String joinNodeId, JoinType joinType, QueryModelNode left, QueryModelNode right) { + private void makeJoinMetadata(final String joinNodeId, final JoinType joinType, final QueryModelNode left, final QueryModelNode right) { final String leftChildNodeId = nodeIds.getOrMakeId(left); final String rightChildNodeId = nodeIds.getOrMakeId(right); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml index 8c2f5b3..8746154 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.client/pom.xml @@ -40,6 +40,10 @@ under the License. <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.api</artifactId> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> <!-- 3rd Party Runtime Dependencies. --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java index 731e182..3e84499 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/ListQueriesCommand.java @@ -34,6 +34,8 @@ import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.client.util.PcjMetadataRenderer; import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -96,12 +98,13 @@ public class ListQueriesCommand implements PcjAdminClientCommand { final GetPcjMetadata getPcjMetadata = new GetPcjMetadata(); final Map<String, PcjMetadata> metadata = new HashMap<String, PcjMetadata>(); try { + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumulo, ryaTablePrefix); if(params.queryId != null) { log.trace("Fetch the PCJ Metadata from Accumulo for Query ID '" + params.queryId + "'."); - metadata.put(params.queryId, getPcjMetadata.getMetadata(accumulo, fluo, params.queryId)); + metadata.put(params.queryId, getPcjMetadata.getMetadata(pcjStorage, fluo, params.queryId)); } else { log.trace("Fetch the PCJ Metadata from Accumulo for all queries that are being updated by Fluo."); - metadata.putAll( getPcjMetadata.getMetadata(accumulo, fluo) ); + metadata.putAll( getPcjMetadata.getMetadata(pcjStorage, fluo) ); } } catch (NotInFluoException | NotInAccumuloException e) { throw new ExecutionException("Could not fetch some of the metadata required to build the report.", e); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java index 9ac87c7..ad66757 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -35,6 +35,8 @@ import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; import org.openrdf.sail.SailException; @@ -114,7 +116,15 @@ public class NewQueryCommand implements PcjAdminClientCommand { log.trace("Loading these values into the Fluo app."); final CreatePcj createPcj = new CreatePcj(); try { - createPcj.withRyaIntegration(fluo, ryaTablePrefix, rya, accumulo, request.getVarOrders(), request.getQuery()); + // Create the PCJ in Rya. + final String sparql = request.getQuery(); + + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumulo, ryaTablePrefix); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo PCJ Updater app to maintain the PCJ. + createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, rya); + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml index 97a967b..dbabfd8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml @@ -42,6 +42,10 @@ under the License. <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.api</artifactId> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> <!-- 3rd Party Runtime Dependencies. --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java index 5ea5532..1d37831 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.rya.indexing.pcj.fluo.demo.Demo.DemoExecutionException; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; +import com.google.common.base.Optional; import com.google.common.io.Files; import io.fluo.api.client.FluoClient; @@ -57,7 +59,19 @@ import io.fluo.api.config.ObserverConfiguration; import io.fluo.api.mini.MiniFluo; import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import mvm.rya.rdftriplestore.RdfCloudTripleStore; import mvm.rya.rdftriplestore.RyaSailRepository; @@ -225,7 +239,7 @@ public class DemoDriver { * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null) * @return The Rya repository sitting on top of the Mini Accumulo. */ - private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException { + private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException, AlreadyInitializedException, RyaDetailsRepositoryException { checkNotNull(accumulo); // Setup the Rya Repository that will be used to create Repository Connections. @@ -234,6 +248,8 @@ public class DemoDriver { crdfdao.setConnector(accumuloConn); // Setup Rya configuration values. + final String ryaInstanceName = "demo_"; + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setTablePrefix("demo_"); conf.setDisplayQueryPlan(true); @@ -250,6 +266,25 @@ public class DemoDriver { final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); ryaRepo.initialize(); + // Create Rya Details for the instance name. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, ryaInstanceName); + + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(ryaInstanceName) + .setRyaVersion("0.0.0.0") + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.<Date>absent() ) ) + .setProspectorDetails( new ProspectorDetails( Optional.<Date>absent() )) + .build(); + + detailsRepo.initialize(details); + return ryaRepo; } @@ -276,6 +311,7 @@ public class DemoDriver { ryaParams.setZookeeperServers(accumulo.getZooKeepers()); ryaParams.setExporterUsername("root"); ryaParams.setExporterPassword("password"); + ryaParams.setRyaInstanceName("demo_"); final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName()); exportObserverConfig.setParameters( exportParams ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 04d4e55..8207966 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -18,31 +18,19 @@ */ package org.apache.rya.indexing.pcj.fluo.demo; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.openrdf.model.Statement; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; @@ -55,13 +43,9 @@ import org.openrdf.repository.RepositoryException; import org.openrdf.sail.SailException; import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import io.fluo.api.client.FluoClient; -import io.fluo.api.client.Snapshot; -import io.fluo.api.data.Bytes; import io.fluo.api.mini.MiniFluo; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaType; @@ -76,8 +60,6 @@ import mvm.rya.rdftriplestore.RyaSailRepository; public class FluoAndHistoricPcjsDemo implements Demo { private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class); - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - // Employees private static final RyaURI alice = new RyaURI("http://Alice"); private static final RyaURI bob = new RyaURI("http://Bob"); @@ -188,8 +170,15 @@ public class FluoAndHistoricPcjsDemo implements Demo { // 4. Write the query to Fluo and import the historic matches. Wait for the app to finish exporting results. log.info("Telling Fluo to maintain the query and import the historic Statement Pattern matches."); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaTablePrefix); + final String pcjId; try { - new CreatePcj().withRyaIntegration(fluoClient, ryaTablePrefix, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ Index in Rya. + pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain it. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); } @@ -200,11 +189,14 @@ public class FluoAndHistoricPcjsDemo implements Demo { log.info(""); // 5. Show that the Fluo app exported the results to the PCJ table in Accumulo. - final String pcjTableName = getPcjTableName(fluoClient, sparql); - - log.info("The following Binding Sets were exported to the '" + pcjTableName+ "' table in Accumulo:"); - Multimap<String, BindingSet> pcjResults = loadPcjResults(accumuloConn, pcjTableName); - prettyLogPcjResults(pcjResults); + log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:"); + try { + for(final BindingSet result : pcjStorage.listResults(pcjId)) { + log.info(" " + result); + } + } catch (final PCJStorageException e) { + throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e); + } waitForEnter(); // 6. Introduce some new Statements that we will stream into the Fluo app. @@ -262,9 +254,14 @@ public class FluoAndHistoricPcjsDemo implements Demo { log.info(""); // 8. Show the new results have been exported to the PCJ table in Accumulo. - log.info("The following Binding Sets were expolrted to the '" + pcjTableName+ "' table in Accumulo:"); - pcjResults = loadPcjResults(accumuloConn, pcjTableName); - prettyLogPcjResults(pcjResults); + log.info("The following Binding Sets were exported to the PCJ with ID '" + pcjId + "' in Rya:"); + try { + for(final BindingSet result : pcjStorage.listResults(pcjId)) { + log.info(" " + result); + } + } catch (final PCJStorageException e) { + throw new DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e); + } log.info(""); } @@ -325,50 +322,4 @@ public class FluoAndHistoricPcjsDemo implements Demo { } } } - - private static String getPcjTableName(final FluoClient fluoClient, final String sparql) { - try(Snapshot snap = fluoClient.newSnapshot()) { - final Bytes queryId = snap.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); - return snap.get(queryId, FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); - } - } - - /** - * Scan accumulo for the results that are stored in a PCJ tablle. The - * multimap stores a set of deserialized binding sets that were in the PCJ - * table for every variable order that is found in the PCJ metadata. - */ - private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws DemoExecutionException { - final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); - - try { - // Get the variable orders the data was written to. - final PcjTables pcjs = new PcjTables(); - final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - - // Scan Accumulo for the stored results. - for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { - final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); - scanner.fetchColumnFamily( new Text(varOrder.toString()) ); - - for(final Entry<Key, Value> entry : scanner) { - final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = converter.convert(serializedResult, varOrder); - fetchedResults.put(varOrder.toString(), result); - } - } - } catch(PcjException | TableNotFoundException | BindingSetConversionException e) { - throw new DemoExecutionException("Couldn't fetch the binding sets that were exported to the PCJ table, so the demo can not continue. Exiting.", e); - } - - return fetchedResults; - } - - private static void prettyLogPcjResults(final Multimap<String, BindingSet> pcjResults) throws DemoExecutionException { - final String varOrderString = pcjResults.keySet().iterator().next(); - final Collection<BindingSet> reuslts = pcjResults.get(varOrderString); - for(final BindingSet result : reuslts) { - log.info(" " + result); - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index d8b14b2..50ac73c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -63,7 +64,9 @@ import org.openrdf.query.impl.MapBindingSet; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; +import com.google.common.base.Optional; import com.google.common.io.Files; import io.fluo.api.client.FluoAdmin; @@ -80,10 +83,21 @@ import io.fluo.api.iterator.ColumnIterator; import io.fluo.api.iterator.RowIterator; import io.fluo.api.mini.MiniFluo; import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import mvm.rya.api.persist.RyaDAOException; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.accumulo.ConfigUtils; @@ -101,7 +115,7 @@ import mvm.rya.sail.config.RyaSailFactory; public abstract class ITBase { private static final Logger log = Logger.getLogger(ITBase.class); - protected static final String RYA_TABLE_PREFIX = "demo_"; + protected static final String RYA_INSTANCE_NAME = "demo_"; protected static final String ACCUMULO_USER = "root"; protected static final String ACCUMULO_PASSWORD = "password"; @@ -130,7 +144,7 @@ public abstract class ITBase { @Before public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException, - RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException { + RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException, AlreadyInitializedException, RyaDetailsRepositoryException, SailException { // Initialize the Mini Accumulo that will be used to host Rya and Fluo. setupMiniAccumulo(); @@ -339,25 +353,12 @@ public abstract class ITBase { } /** - * Sets up a Rya instance - * - * @param user - * @param password - * @param instanceName - * @param zookeepers - * @param appName - * @return - * @throws AccumuloException - * @throws AccumuloSecurityException - * @throws RepositoryException - * @throws RyaDAOException - * @throws NumberFormatException - * @throws UnknownHostException - * @throws InferenceEngineException + * Sets up a Rya instance. + * @throws SailException */ protected static RyaSailRepository setupRya(final String user, final String password, final String instanceName, final String zookeepers, final String appName) throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, - NumberFormatException, UnknownHostException, InferenceEngineException { + NumberFormatException, UnknownHostException, InferenceEngineException, mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException, RyaDetailsRepositoryException, SailException { checkNotNull(user); checkNotNull(password); @@ -367,7 +368,7 @@ public abstract class ITBase { // Setup Rya configuration values. final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_TABLE_PREFIX); + conf.setTablePrefix(RYA_INSTANCE_NAME); conf.setDisplayQueryPlan(true); conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); conf.set(ConfigUtils.CLOUDBASE_USER, user); @@ -384,7 +385,25 @@ public abstract class ITBase { final Sail sail = RyaSailFactory.getInstance(conf); final RyaSailRepository ryaRepo = new RyaSailRepository(sail); - ryaRepo.initialize(); + + // Initialize the Rya Details. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, RYA_INSTANCE_NAME); + + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(RYA_INSTANCE_NAME) + .setRyaVersion("0.0.0.0") + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.<Date>absent() ) ) + .setProspectorDetails( new ProspectorDetails( Optional.<Date>absent() )) + .build(); + + detailsRepo.initialize(details); return ryaRepo; } @@ -404,13 +423,11 @@ public abstract class ITBase { /** * Setup a Mini Fluo cluster that uses a temporary directory to store its - * data.ll + * data. * * @return A Mini Fluo cluster. */ protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { -// final File miniDataDir = Files.createTempDir(); - // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverConfiguration> observers = new ArrayList<>(); observers.add(new ObserverConfiguration(TripleObserver.class.getName())); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 124f5a9..41a4b3d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.api; import static org.junit.Assert.assertEquals; -import java.io.File; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; @@ -29,7 +28,6 @@ import org.apache.rya.indexing.pcj.fluo.ITBase; import org.junit.Test; import com.google.common.base.Optional; -import com.google.common.io.Files; import io.fluo.api.client.FluoAdmin; import io.fluo.api.client.FluoAdmin.AlreadyInitializedException; @@ -51,7 +49,6 @@ public class CountStatementsIT extends ITBase { * statements are inserted as part of the test will not be consumed. * * @return A Mini Fluo cluster. - * @throws TableExistsException */ @Override protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { @@ -66,21 +63,21 @@ public class CountStatementsIT extends ITBase { config.setAccumuloPassword(ACCUMULO_PASSWORD); config.setInstanceZookeepers(zookeepers + "/fluo"); config.setAccumuloZookeepers(zookeepers); - + config.setApplicationName(appName); config.setAccumuloTable("fluo" + appName); - + config.addObservers(observers); - FluoFactory.newAdmin(config).initialize( - new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); + FluoFactory.newAdmin(config).initialize( + new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); return miniFluo; } @Test - public void test() { + public void countStatements() { // Insert some Triples into the Fluo app. final List<RyaStatement> triples = new ArrayList<>(); triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Alice")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index e41eb7b..94d974d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -26,16 +26,19 @@ import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableExistsException; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.junit.Test; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; +import org.openrdf.repository.RepositoryException; import org.openrdf.sail.SailException; import com.google.common.collect.Sets; @@ -46,30 +49,38 @@ import com.google.common.collect.Sets; public class GetPcjMetadataIT extends ITBase { @Test - public void getMetadataByQueryId() throws AccumuloException, AccumuloSecurityException, TableExistsException, PcjException, NotInFluoException, NotInAccumuloException, MalformedQueryException, SailException, QueryEvaluationException { + public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException { final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; - final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, varOrders, sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); - // Ensure the command returns the correct metadata. - final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); - final PcjMetadata metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient, queryId); + final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId); + // Ensure the command returns the correct metadata. + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); assertEquals(expected, metadata); } @Test - public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException { + public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException { + final CreatePcj createPcj = new CreatePcj(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + // Add a couple of queries to Accumulo. final String q1Sparql = "SELECT ?x " + @@ -77,8 +88,8 @@ public class GetPcjMetadataIT extends ITBase { "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; - final Set<VariableOrder> q1VarOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); - createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, q1VarOrders, q1Sparql); + final String q1PcjId = pcjStorage.createPcj(q1Sparql); + createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, ryaRepo); final String q2Sparql = "SELECT ?x ?y " + @@ -86,15 +97,17 @@ public class GetPcjMetadataIT extends ITBase { "?x <http://talksTo> ?y. " + "?y <http://worksAt> <http://Chipotle>." + "}"; - final Set<VariableOrder> q2VarOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x", "y") ); - createPcj.withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, q2VarOrders, q2Sparql); + final String q2PcjId = pcjStorage.createPcj(q2Sparql); + createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, ryaRepo); // Ensure the command returns the correct metadata. final Set<PcjMetadata> expected = new HashSet<>(); + final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql); + final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql); expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders)); expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders)); - final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient); + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); assertEquals(expected, Sets.newHashSet( metadata.values() )); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 7eae3e4..bab74e9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import java.math.BigInteger; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -32,7 +31,8 @@ import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import com.google.common.base.Optional; @@ -74,8 +74,12 @@ public class GetQueryReportIT extends ITBase { makeRyaStatement("http://Frank", "http://worksAt", "http://Burrito Place"), makeRyaStatement("http://Frank", "http://livesIn", "http://Lost County")); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -84,7 +88,7 @@ public class GetQueryReportIT extends ITBase { fluo.waitForObservers(); // Fetch the report. - final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(accumuloConn, fluoClient); + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); final Set<String> queryIds = metadata.keySet(); assertEquals(1, queryIds.size()); final String queryId = queryIds.iterator().next(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java index 51f9bf5..8434a8d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -28,7 +28,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.junit.Test; import com.beust.jcommander.internal.Lists; @@ -40,8 +39,6 @@ import io.fluo.api.types.TypedTransaction; */ public class ListQueryIdsIT extends ITBase { - private static final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory(); - /** * This test ensures that when there are PCJ tables in Accumulo as well as * the Fluo table's export destinations column, the command for fetching the http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index bfb71a5..5cb68d3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.repository.RepositoryException; import io.fluo.api.client.Snapshot; import io.fluo.api.client.Transaction; @@ -38,7 +39,7 @@ import io.fluo.api.client.Transaction; public class FluoQueryMetadataDAOIT extends ITBase { @Test - public void statementPatternMetadataTest() { + public void statementPatternMetadataTest() throws RepositoryException { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized.
