RYA-142 Stopped using Fluo TypeLayer
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/177c80a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/177c80a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/177c80a1 Branch: refs/heads/master Commit: 177c80a1ed9845f7914610714cb7084d4877cf76 Parents: ca2743a Author: Keith Turner <ke...@deenlo.com> Authored: Thu Oct 6 18:55:34 2016 -0400 Committer: Aaron Mihalik <miha...@alum.mit.edu> Committed: Thu Oct 13 12:55:44 2016 -0400 ---------------------------------------------------------------------- .../rya/indexing/pcj/fluo/api/CreatePcj.java | 22 ++-- .../rya/indexing/pcj/fluo/api/DeletePcj.java | 30 +++--- .../indexing/pcj/fluo/api/GetPcjMetadata.java | 12 +-- .../indexing/pcj/fluo/api/InsertTriples.java | 19 +--- .../rya/indexing/pcj/fluo/api/ListQueryIds.java | 5 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 4 - .../pcj/fluo/app/FilterResultUpdater.java | 8 +- .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 9 +- .../pcj/fluo/app/JoinResultUpdater.java | 10 +- .../pcj/fluo/app/QueryResultUpdater.java | 10 +- .../indexing/pcj/fluo/app/StringTypeLayer.java | 29 ------ .../app/export/IncrementalResultExporter.java | 4 +- .../fluo/app/export/rya/RyaResultExporter.java | 7 +- .../fluo/app/observers/BindingSetUpdater.java | 12 +-- .../fluo/app/observers/QueryResultObserver.java | 18 ++-- .../pcj/fluo/app/observers/TripleObserver.java | 30 ++---- .../fluo/app/query/FluoQueryMetadataDAO.java | 100 +++++++++---------- .../indexing/pcj/fluo/api/ListQueryIdsIT.java | 13 ++- pom.xml | 6 -- 19 files changed, 127 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 943a022..d31e578 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -29,7 +29,6 @@ import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -55,7 +54,7 @@ import org.openrdf.sail.SailException; import info.aduna.iteration.CloseableIteration; import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.recipes.core.types.TypedTransaction; +import org.apache.fluo.api.client.Transaction; /** * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. @@ -76,11 +75,6 @@ import org.apache.fluo.recipes.core.types.TypedTransaction; public class CreatePcj { /** - * Wraps Fluo {@link Transaction}s so that we can write String values to them. - */ - private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer(); - - /** * The default Statement Pattern batch insert size is 1000. */ private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; @@ -150,15 +144,15 @@ public class CreatePcj { final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); - try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( fluo.newTransaction() )) { + try(Transaction tx = fluo.newTransaction()) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. final String queryId = fluoQuery.getQueryMetadata().getNodeId(); - tx.mutate().row(queryId).col(FluoQueryColumns.RYA_PCJ_ID).set(pcjId); - tx.mutate().row(pcjId).col(FluoQueryColumns.PCJ_ID_QUERY_ID).set(queryId); - + tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); + tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); + // Flush the changes to Fluo. tx.commit(); } @@ -206,7 +200,7 @@ public class CreatePcj { final BindingSetStringConverter converter = new BindingSetStringConverter(); - try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) { + try(Transaction tx = fluo.newTransaction()) { // Get the node's variable order. final String spNodeId = spMetadata.getNodeId(); final VariableOrder varOrder = spMetadata.getVariableOrder(); @@ -221,9 +215,7 @@ public class CreatePcj { final String bindingSetStr = converter.convert(spBindingSet, varOrder); // Write the binding set entry to Fluo for the statement pattern. - tx.mutate().row(spNodeId + NODEID_BS_DELIM + bindingSetStr) - .col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) - .set(bindingSetStr); + tx.set(spNodeId + NODEID_BS_DELIM + bindingSetStr, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, bindingSetStr); } tx.commit(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java index e1a9b8e..79ca0ea 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java @@ -28,7 +28,6 @@ import java.util.List; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -43,7 +42,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; -import org.apache.fluo.recipes.core.types.TypedTransaction; /** * Deletes a Pre-computed Join (PCJ) from Fluo. @@ -174,7 +172,7 @@ public class DeletePcj { requireNonNull(nodeIds); requireNonNull(pcjId); - try (final TypedTransaction typeTx = new StringTypeLayer().wrap(tx)) { + try (final Transaction typeTx = tx) { deletePcjIdAndSparqlMetadata(typeTx, pcjId); for (final String nodeId : nodeIds) { @@ -192,7 +190,7 @@ public class DeletePcj { * @param nodeId - The Node ID of the query node to delete. (not null) * @param columns - The columns that will be deleted. (not null) */ - private void deleteMetadataColumns(final TypedTransaction tx, final String nodeId, final List<Column> columns) { + private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) { requireNonNull(tx); requireNonNull(columns); requireNonNull(nodeId); @@ -211,15 +209,15 @@ public class DeletePcj { * @param tx - Transaction the deletes will be performed with. (not null) * @param pcjId - The PCJ whose metadata will be deleted. (not null) */ - private void deletePcjIdAndSparqlMetadata(final TypedTransaction tx, final String pcjId) { + private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) { requireNonNull(tx); requireNonNull(pcjId); final String queryId = getQueryIdFromPcjId(tx, pcjId); final String sparql = getSparqlFromQueryId(tx, queryId); - tx.delete(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID); - tx.delete(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); - tx.delete(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); + tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID); + tx.delete(sparql, FluoQueryColumns.QUERY_ID); + tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID); } @@ -253,17 +251,19 @@ public class DeletePcj { requireNonNull(scanner); requireNonNull(column); - int count = 0; - Iterator<RowColumnValue> iter = scanner.iterator(); - while (iter.hasNext() && count < batchSize) { + try(Transaction ntx = tx) { + int count = 0; + Iterator<RowColumnValue> iter = scanner.iterator(); + while (iter.hasNext() && count < batchSize) { final Bytes row = iter.next().getRow(); count++; tx.delete(row, column); - } + } - final boolean hasNext = iter.hasNext(); - tx.commit(); - return hasNext; + final boolean hasNext = iter.hasNext(); + tx.commit(); + return hasNext; + } } private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java index d8c800e..061a1d5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java @@ -24,15 +24,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.core.types.TypedSnapshot; /** * Get {@link PcjMetadata} for queries that are managed by the Fluo app. @@ -87,13 +86,12 @@ public class GetPcjMetadata { // Lookup the Rya PCJ ID associated with the query. String pcjId = null; - try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() ) ) { - final Bytes pcjIdBytes = snap.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID); - if(pcjIdBytes == null) { + try(Snapshot snap = fluo.newSnapshot() ) { + pcjId = snap.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + if(pcjId == null) { throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId + "' because a Rya PCJ ID not stored in the Fluo table."); } - pcjId = pcjIdBytes.toString(); } // Fetch the metadata from the storage. @@ -128,4 +126,4 @@ public class GetPcjMetadata { super(message, cause); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java index 1f45388..b184ff3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java @@ -25,15 +25,13 @@ import java.util.Collections; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import com.google.common.base.Optional; import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypedTransaction; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Bytes; import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.triple.TripleRow; @@ -48,17 +46,10 @@ public class InsertTriples { private static final Logger log = Logger.getLogger(InsertTriples.class); /** - * Wraps Fluo {@link Transaction}s so that we can write String values to them. - */ - private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer(); - - /** * Converts triples into the byte[] used as the row ID in Accumulo. */ private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); - private static final Encoder ENCODER = new StringEncoder(); - // TODO visiblity is part of RyaStatement. Put it there instead. /** @@ -85,10 +76,10 @@ public class InsertTriples { checkNotNull(triples); checkNotNull(visibility); - try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) { + try(Transaction tx = fluo.newTransaction()) { for(final RyaStatement triple : triples) { try { - tx.mutate().row(spoFormat(triple)).col(FluoQueryColumns.TRIPLES).set(ENCODER.encode(visibility.or(""))); + tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(""))); } catch (final TripleRowResolverException e) { log.error("Could not convert a Triple into the SPO format: " + triple); } @@ -111,4 +102,4 @@ public class InsertTriples { final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); return spoRow.getRow(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java index 3913e41..df1648b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java @@ -24,13 +24,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.recipes.core.types.TypedSnapshot; /** * Finds all queries that are being managed by this instance of Fluo that @@ -51,7 +50,7 @@ public class ListQueryIds { final List<String> queryIds = new ArrayList<>(); - try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() )) { + try(Snapshot snap = fluo.newSnapshot() ) { // Create an iterator that iterates over the QUERY_ID column. final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_ID).build(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index 45ea9ce..f756cdb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -56,10 +56,6 @@ under the License. <artifactId>fluo-api</artifactId> </dependency> <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-core</artifactId> - </dependency> - <dependency> <groupId>org.apache.fluo</groupId> <artifactId>fluo-core</artifactId> <exclusions> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 328c653..b1af4fc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -52,8 +52,6 @@ import info.aduna.iteration.CloseableIteration; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; /** * Updates the results of a Filter node when its child has added a new Binding @@ -62,8 +60,6 @@ import org.apache.fluo.recipes.core.types.StringEncoder; @ParametersAreNonnullByDefault public class FilterResultUpdater { - private static final Encoder ENCODER = new StringEncoder(); - private static final BindingSetStringConverter ID_CONVERTER = new BindingSetStringConverter(); private static final VisibilityBindingSetStringConverter VALUE_CONVERTER = new VisibilityBindingSetStringConverter(); @@ -134,9 +130,9 @@ public class FilterResultUpdater { String filterBindingSetValueString = ""; filterBindingSetValueString = VALUE_CONVERTER.convert(childBindingSet, filterVarOrder); - final Bytes row = ENCODER.encode( filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetIdString ); + final String row = filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetIdString; final Column col = FluoQueryColumns.FILTER_BINDING_SET; - final Bytes value = ENCODER.encode(filterBindingSetValueString); + final String value = filterBindingSetValueString; tx.set(row, col, value); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java index 7ccfeff..513ab40 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java @@ -25,11 +25,11 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.UR import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.recipes.core.types.TypedTransaction; import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.triple.TripleRow; @@ -38,7 +38,6 @@ import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver; public class IncUpdateDAO { - private static final StringTypeLayer stl = new StringTypeLayer(); private static final WholeRowTripleResolver tr = new WholeRowTripleResolver(); public static RyaStatement deserializeTriple(final Bytes row) { @@ -77,7 +76,7 @@ public class IncUpdateDAO { */ public static void addRow(final FluoClient fluoClient, final String row, final Column col, final String val) { checkNotNull(fluoClient); - try (TypedTransaction tx = stl.wrap(fluoClient.newTransaction())) { + try (Transaction tx = fluoClient.newTransaction()) { addRow(tx, row, col, val); tx.commit(); } @@ -91,9 +90,9 @@ public class IncUpdateDAO { * @param col - The Column. * @param val - The value. */ - public static void addRow(final TypedTransaction tx, final String row, final Column col, final String val) { + public static void addRow(final Transaction tx, final String row, final Column col, final String val) { checkNotNull(tx); - tx.mutate().row(row).col(col).set(val); + tx.set(row, col, val); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java index 5ac69b0..73a03ca 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java @@ -49,12 +49,9 @@ import com.google.common.collect.Sets; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.Span; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; /** * Updates the results of a Join node when one of its children has added a @@ -67,8 +64,7 @@ public class JoinResultUpdater { private static final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); - private final Encoder encoder = new StringEncoder(); - + /** * Updates the results of a Join node when one of its children has added a * new Binding Set to its results. @@ -132,9 +128,9 @@ public class JoinResultUpdater { final String joinBindingSetStringId = idConverter.convert(newJoinResult, joinVarOrder); final String joinBindingSetStringValue = valueConverter.convert(newJoinResult, joinVarOrder); - final Bytes row = encoder.encode(joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId); + final String row = joinMetadata.getNodeId() + NODEID_BS_DELIM + joinBindingSetStringId; final Column col = FluoQueryColumns.JOIN_BINDING_SET; - final Bytes value = encoder.encode(joinBindingSetStringValue); + final String value = joinBindingSetStringValue; tx.set(row, col, value); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java index 41f9025..b4800fc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/QueryResultUpdater.java @@ -33,10 +33,7 @@ import org.openrdf.query.Binding; import org.openrdf.query.impl.MapBindingSet; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; /** * Updates the results of a Query node when one of its children has added a @@ -44,8 +41,7 @@ import org.apache.fluo.recipes.core.types.StringEncoder; */ @ParametersAreNonnullByDefault public class QueryResultUpdater { - private final Encoder encoder = new StringEncoder(); - + private final BindingSetStringConverter converter = new BindingSetStringConverter(); private final VisibilityBindingSetStringConverter valueConverter = new VisibilityBindingSetStringConverter(); @@ -79,9 +75,9 @@ public class QueryResultUpdater { final String queryBindingSetValueString = valueConverter.convert(new VisibilityBindingSet(queryBindingSet, childBindingSet.getVisibility()), queryVarOrder); // Commit it to the Fluo table for the SPARQL query. This isn't guaranteed to be a new entry. - final Bytes row = encoder.encode(queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString); + final String row = queryMetadata.getNodeId() + NODEID_BS_DELIM + queryBindingSetString; final Column col = FluoQueryColumns.QUERY_BINDING_SET; - final Bytes value = encoder.encode(queryBindingSetValueString); + final String value = queryBindingSetValueString; tx.set(row, col, value); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java deleted file mode 100644 index aecb434..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/StringTypeLayer.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app; - -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypeLayer; - -public class StringTypeLayer extends TypeLayer { - - public StringTypeLayer() { - super(new StringEncoder()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java index a7f016d..ecc39bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java @@ -20,9 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.app.export; import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.fluo.api.client.TransactionBase; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; /** * Exports a single Binding Set that is a new result for a SPARQL query to some @@ -40,7 +40,7 @@ public interface IncrementalResultExporter { * Fluo application. (not null) * @throws ResultExportException The result could not be exported. */ - public void export(TypedTransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; + public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; /** * A result could not be exported. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java index 27530f0..a4b589f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java @@ -27,9 +27,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - +import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; /** * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya. @@ -49,14 +48,14 @@ public class RyaResultExporter implements IncrementalResultExporter { @Override public void export( - final TypedTransactionBase fluoTx, + final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { checkNotNull(fluoTx); checkNotNull(queryId); checkNotNull(result); - final String pcjId = fluoTx.get(Bytes.of(queryId), FluoQueryColumns.RYA_PCJ_ID).toString(); + final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); try { pcjStorage.addResults(pcjId, Collections.singleton(result)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java index e344b0a..a2953cf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java @@ -37,10 +37,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypedObserver; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; +import org.apache.fluo.api.observer.AbstractObserver; /** * Notified when the results of a node have been updated to include a new Binding @@ -48,9 +45,8 @@ import org.apache.fluo.recipes.core.types.TypedTransactionBase; * results. */ @ParametersAreNonnullByDefault -public abstract class BindingSetUpdater extends TypedObserver { +public abstract class BindingSetUpdater extends AbstractObserver { - private final Encoder encoder = new StringEncoder(); // DAO private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO(); @@ -72,12 +68,12 @@ public abstract class BindingSetUpdater extends TypedObserver { public abstract Observation parseObservation(TransactionBase tx, final BindingSetRow parsedRow); @Override - public final void process(final TypedTransactionBase tx, final Bytes row, final Column col) { + public final void process(final TransactionBase tx, final Bytes row, final Column col) { checkNotNull(tx); checkNotNull(row); checkNotNull(col); - final String bindingSetString = encoder.decodeString(tx.get(row, col)); + final String bindingSetString = tx.get(row, col).toString(); final Observation observation = parseObservation( tx, new BindingSetRow(BindingSetRow.make(row).getNodeId(), bindingSetString) ); final String observedNodeId = observation.getObservedNodeId(); final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index 638b1fc..0944f9b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -38,23 +38,19 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; - +import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypedObserver; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; +import org.apache.fluo.api.observer.AbstractObserver; import mvm.rya.accumulo.utils.VisibilitySimplifier; /** * Performs incremental result exporting to the configured destinations. */ -public class QueryResultObserver extends TypedObserver { +public class QueryResultObserver extends AbstractObserver { private static final Logger log = Logger.getLogger(QueryResultObserver.class); private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); - private static final Encoder ENCODER = new StringEncoder(); private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); /** @@ -107,11 +103,13 @@ public class QueryResultObserver extends TypedObserver { } @Override - public void process(final TypedTransactionBase tx, final Bytes row, final Column col) { + public void process(final TransactionBase tx, final Bytes brow, final Column col) { + final String row = brow.toString(); + // Read the SPARQL query and it Binding Set from the row id. - final String[] queryAndBindingSet = ENCODER.decodeString(row).split(NODEID_BS_DELIM); + final String[] queryAndBindingSet = row.split(NODEID_BS_DELIM); final String queryId = queryAndBindingSet[0]; - final String bindingSetString = ENCODER.decodeString(tx.get(row, col)); + final String bindingSetString = tx.gets(row, col); // Fetch the query's Variable Order from the Fluo table. final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java index 31a4c29..70f1cbd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java @@ -26,7 +26,6 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VA import java.util.Map; import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; @@ -35,32 +34,26 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import com.google.common.collect.Maps; - +import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.Span; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; -import org.apache.fluo.recipes.core.types.TypedObserver; -import org.apache.fluo.recipes.core.types.TypedTransactionBase; +import org.apache.fluo.api.observer.AbstractObserver; /** * An observer that matches new Triples to the Statement Patterns that are part * of any PCJ that is being maintained. If the triple matches a pattern, then * the new result is stored as a binding set for the pattern. */ -public class TripleObserver extends TypedObserver { +public class TripleObserver extends AbstractObserver { - private static final Encoder ENCODER = new StringEncoder(); private static final FluoQueryMetadataDAO QUERY_DAO = new FluoQueryMetadataDAO(); private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); - public TripleObserver() { - super(new StringTypeLayer()); - } + public TripleObserver() {} @Override public ObservedColumn getObservedColumn() { @@ -68,15 +61,12 @@ public class TripleObserver extends TypedObserver { } @Override - public void process(final TypedTransactionBase tx, final Bytes row, final Column column) { + public void process(final TransactionBase tx, final Bytes brow, final Column column) { //get string representation of triple - final String triple = IncUpdateDAO.getTripleString(row); - final Bytes visiBytes = tx.get(row, FluoQueryColumns.TRIPLES); - String visibility = ""; - if(visiBytes != null) { - visibility = ENCODER.decodeString(visiBytes); - } - + String row = brow.toString(); + final String triple = IncUpdateDAO.getTripleString(brow); + String visibility = tx.gets(row, FluoQueryColumns.TRIPLES, ""); + //get variable metadata for all SP in table RowScanner rscanner = tx.scanner().over(Span.prefix(SP_PREFIX)).fetch(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER).byRow().build(); @@ -100,7 +90,7 @@ public class TripleObserver extends TypedObserver { CONVERTER.convert(bindingSetString, varOrder), visibility); final String valueString = CONVERTER.convert(bindingSet, varOrder); - tx.mutate().row(spID + NODEID_BS_DELIM + bindingSetString).col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).set(valueString); + tx.set(spID + NODEID_BS_DELIM + bindingSetString, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueString); } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index a955a53..8d41c61 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -34,8 +34,6 @@ import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.types.Encoder; -import org.apache.fluo.recipes.core.types.StringEncoder; /** * Reads and writes {@link FluoQuery} instances and their components to/from @@ -44,8 +42,6 @@ import org.apache.fluo.recipes.core.types.StringEncoder; @ParametersAreNonnullByDefault public class FluoQueryMetadataDAO { - private static final Encoder encoder = new StringEncoder(); - /** * Write an instance of {@link QueryMetadata} to the Fluo table. * @@ -56,11 +52,11 @@ public class FluoQueryMetadataDAO { checkNotNull(tx); checkNotNull(metadata); - final Bytes rowId = encoder.encode(metadata.getNodeId()); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.QUERY_NODE_ID, rowId); - tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); - tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, encoder.encode( metadata.getSparql() )); - tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() )); + tx.set(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.QUERY_SPARQL, metadata.getSparql() ); + tx.set(rowId, FluoQueryColumns.QUERY_CHILD_NODE_ID, metadata.getChildNodeId() ); } /** @@ -79,18 +75,18 @@ public class FluoQueryMetadataDAO { checkNotNull(nodeId); // Fetch the values from the Fluo table. - final Bytes rowId = encoder.encode(nodeId); - final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.QUERY_VARIABLE_ORDER, FluoQueryColumns.QUERY_SPARQL, - FluoQueryColumns.QUERY_CHILD_NODE_ID)); + FluoQueryColumns.QUERY_CHILD_NODE_ID); // Return an object holding them. - final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.QUERY_VARIABLE_ORDER)); + final String varOrderString = values.get(FluoQueryColumns.QUERY_VARIABLE_ORDER); final VariableOrder varOrder = new VariableOrder(varOrderString); - final String sparql = encoder.decodeString( values.get(FluoQueryColumns.QUERY_SPARQL) ); - final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID) ); + final String sparql = values.get(FluoQueryColumns.QUERY_SPARQL); + final String childNodeId = values.get(FluoQueryColumns.QUERY_CHILD_NODE_ID); return QueryMetadata.builder(nodeId) .setVariableOrder( varOrder ) @@ -108,13 +104,13 @@ public class FluoQueryMetadataDAO { checkNotNull(tx); checkNotNull(metadata); - final Bytes rowId = encoder.encode(metadata.getNodeId()); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId); - tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); - tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, encoder.encode( metadata.getOriginalSparql() )); - tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, encoder.encode( metadata.getFilterIndexWithinSparql() )); - tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); - tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, encoder.encode( metadata.getChildNodeId() )); + tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, metadata.getOriginalSparql() ); + tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, metadata.getFilterIndexWithinSparql()+"" ); + tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, metadata.getParentNodeId() ); + tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, metadata.getChildNodeId() ); } /** @@ -133,22 +129,22 @@ public class FluoQueryMetadataDAO { checkNotNull(nodeId); // Fetch the values from the Fluo table. - final Bytes rowId = encoder.encode(nodeId); - final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, FluoQueryColumns.FILTER_PARENT_NODE_ID, - FluoQueryColumns.FILTER_CHILD_NODE_ID)); + FluoQueryColumns.FILTER_CHILD_NODE_ID); // Return an object holding them. - final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER)); + final String varOrderString = values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER); final VariableOrder varOrder = new VariableOrder(varOrderString); - final String originalSparql = encoder.decodeString( values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL) ); - final int filterIndexWithinSparql = encoder.decodeInteger( values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL) ); - final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID) ); - final String childNodeId = encoder.decodeString( values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID) ); + final String originalSparql = values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL); + final int filterIndexWithinSparql = Integer.parseInt(values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL)); + final String parentNodeId = values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID); + final String childNodeId = values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID); return FilterMetadata.builder(nodeId) .setVarOrder(varOrder) @@ -168,13 +164,13 @@ public class FluoQueryMetadataDAO { checkNotNull(tx); checkNotNull(metadata); - final Bytes rowId = encoder.encode(metadata.getNodeId()); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.JOIN_NODE_ID, rowId); - tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); - tx.set(rowId, FluoQueryColumns.JOIN_TYPE, encoder.encode(metadata.getJoinType().toString()) ); - tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); - tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, encoder.encode( metadata.getLeftChildNodeId() )); - tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, encoder.encode( metadata.getRightChildNodeId() )); + tx.set(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.JOIN_TYPE, metadata.getJoinType().toString() ); + tx.set(rowId, FluoQueryColumns.JOIN_PARENT_NODE_ID, metadata.getParentNodeId() ); + tx.set(rowId, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, metadata.getLeftChildNodeId() ); + tx.set(rowId, FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID, metadata.getRightChildNodeId() ); } /** @@ -193,24 +189,24 @@ public class FluoQueryMetadataDAO { checkNotNull(nodeId); // Fetch the values from the Fluo table. - final Bytes rowId = encoder.encode(nodeId); - final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.JOIN_VARIABLE_ORDER, FluoQueryColumns.JOIN_TYPE, FluoQueryColumns.JOIN_PARENT_NODE_ID, FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID, - FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID)); + FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID); // Return an object holding them. - final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER)); + final String varOrderString = values.get(FluoQueryColumns.JOIN_VARIABLE_ORDER); final VariableOrder varOrder = new VariableOrder(varOrderString); - final String joinTypeString = encoder.decodeString( values.get(FluoQueryColumns.JOIN_TYPE) ); + final String joinTypeString = values.get(FluoQueryColumns.JOIN_TYPE); final JoinType joinType = JoinType.valueOf(joinTypeString); - final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID) ); - final String leftChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID) ); - final String rightChildNodeId = encoder.decodeString( values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID) ); + final String parentNodeId = values.get(FluoQueryColumns.JOIN_PARENT_NODE_ID); + final String leftChildNodeId = values.get(FluoQueryColumns.JOIN_LEFT_CHILD_NODE_ID); + final String rightChildNodeId = values.get(FluoQueryColumns.JOIN_RIGHT_CHILD_NODE_ID); return JoinMetadata.builder(nodeId) .setVariableOrder(varOrder) @@ -230,11 +226,11 @@ public class FluoQueryMetadataDAO { checkNotNull(tx); checkNotNull(metadata); - final Bytes rowId = encoder.encode(metadata.getNodeId()); + final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_NODE_ID, rowId); - tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, encoder.encode( metadata.getVariableOrder().toString() )); - tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, encoder.encode( metadata.getStatementPattern() )); - tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID, encoder.encode( metadata.getParentNodeId() )); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, metadata.getStatementPattern() ); + tx.set(rowId, FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID, metadata.getParentNodeId()); } /** @@ -253,18 +249,18 @@ public class FluoQueryMetadataDAO { checkNotNull(nodeId); // Fetch the values from the Fluo table. - final Bytes rowId = encoder.encode(nodeId); - final Map<Column, Bytes> values = sx.get(rowId, Sets.newHashSet( + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER, FluoQueryColumns.STATEMENT_PATTERN_PATTERN, - FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID)); + FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID); // Return an object holding them. - final String varOrderString = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER)); + final String varOrderString = values.get(FluoQueryColumns.STATEMENT_PATTERN_VARIABLE_ORDER); final VariableOrder varOrder = new VariableOrder(varOrderString); - final String pattern = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) ); - final String parentNodeId = encoder.decodeString( values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID) ); + final String pattern = values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN); + final String parentNodeId = values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID); return StatementPatternMetadata.builder(nodeId) .setVarOrder(varOrder) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java index 99e2191..19bc272 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -27,12 +27,11 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.rya.indexing.pcj.fluo.ITBase; -import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.junit.Test; import com.beust.jcommander.internal.Lists; -import org.apache.fluo.recipes.core.types.TypedTransaction; +import org.apache.fluo.api.client.Transaction; /** * Integration tests the methods of {@link ListQueryIds}. @@ -47,11 +46,11 @@ public class ListQueryIdsIT extends ITBase { @Test public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException { // Store a few SPARQL/Query ID pairs in the Fluo table. - try(TypedTransaction tx = new StringTypeLayer().wrap( fluoClient.newTransaction() )) { - tx.mutate().row("SPARQL_3").col(QUERY_ID).set("ID_3"); - tx.mutate().row("SPARQL_1").col(QUERY_ID).set("ID_1"); - tx.mutate().row("SPARQL_4").col(QUERY_ID).set("ID_4"); - tx.mutate().row("SPARQL_2").col(QUERY_ID).set("ID_2"); + try(Transaction tx = fluoClient.newTransaction()) { + tx.set("SPARQL_3", QUERY_ID, "ID_3"); + tx.set("SPARQL_1", QUERY_ID, "ID_1"); + tx.set("SPARQL_4", QUERY_ID, "ID_4"); + tx.set("SPARQL_2", QUERY_ID, "ID_2"); tx.commit(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/177c80a1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6e280c8..6ca4505 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,6 @@ under the License. <maven.min-version>3.0.4</maven.min-version> <fluo.version>1.0.0-incubating</fluo.version> - <fluo-recipes.version>1.0.0-incubating-SNAPSHOT</fluo-recipes.version> <jmh.version>1.13</jmh.version> <jsr305.version>3.0.1</jsr305.version> @@ -524,11 +523,6 @@ under the License. <artifactId>fluo-mini</artifactId> <version>${fluo.version}</version> </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-core</artifactId> - <version>${fluo-recipes.version}</version> - </dependency> <dependency> <groupId>org.mockito</groupId>