Repository: incubator-rya Updated Branches: refs/heads/master 5015f5945 -> bd4552582
RYA-165 Closes #90. Simplify PCJ result visibility expressions before exporting them to the Rya PCJ index table. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/bd455258 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/bd455258 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/bd455258 Branch: refs/heads/master Commit: bd45525822d04328c59ce4aae6bd76c027ebeada Parents: 5015f59 Author: Kevin Chilton <[email protected]> Authored: Mon Sep 12 17:05:51 2016 -0400 Committer: pujav65 <[email protected]> Committed: Tue Sep 27 11:46:44 2016 -0400 ---------------------------------------------------------------------- .../accumulo/utils/VisibilitySimplifier.java | 54 ++++++ .../utils/VisibilitySimplifierTest.java | 53 +++++ .../storage/accumulo/VisibilityBindingSet.java | 28 ++- .../VisibilityBindingSetStringConverter.java | 40 ++-- ...VisibilityBindingSetStringConverterTest.java | 5 +- .../fluo/app/observers/QueryResultObserver.java | 27 ++- .../apache/rya/indexing/pcj/fluo/ITBase.java | 193 +++++++------------ .../pcj/fluo/api/CountStatementsIT.java | 4 +- .../pcj/fluo/integration/RyaExportIT.java | 21 -- .../pcj/fluo/visibility/PcjVisibilityIT.java | 125 ++++++++++-- 10 files changed, 361 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java new file mode 100644 index 0000000..cc4edca --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.accumulo.utils; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.security.ColumnVisibility; + +import com.google.common.base.Charsets; + +/** + * Simplifies Accumulo visibility expressions. + */ +@ParametersAreNonnullByDefault +public class VisibilitySimplifier { + + /** + * Simplifies an Accumulo visibility expression. + * + * @param visibility - The expression to simplify. (not null) + * @return A simplified form of {@code visibility}. + */ + public String simplify(final String visibility) { + requireNonNull(visibility); + + String last = visibility; + String simplified = new String(new ColumnVisibility(visibility).flatten(), Charsets.UTF_8); + + while(!simplified.equals(last)) { + last = simplified; + simplified = new String(new ColumnVisibility(simplified).flatten(), Charsets.UTF_8); + } + + return simplified; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/utils/VisibilitySimplifierTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/utils/VisibilitySimplifierTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/utils/VisibilitySimplifierTest.java new file mode 100644 index 0000000..3d22e2f --- /dev/null +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/utils/VisibilitySimplifierTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.accumulo.utils; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests the methods of {@link VisibilitySimplifier}. + */ +public class VisibilitySimplifierTest { + + @Test + public void noneRequired() { + final String simplified = new VisibilitySimplifier().simplify("u"); + assertEquals("u", simplified); + } + + @Test + public void parenthesis() { + final String simplified = new VisibilitySimplifier().simplify("(u&u)&u"); + assertEquals("u", simplified); + } + + @Test + public void manyAnds() { + final String simplified = new VisibilitySimplifier().simplify("u&u&u"); + assertEquals("u", simplified); + } + + @Test + public void complex() { + final String simplified = new VisibilitySimplifier().simplify("(a|b)|(a|b)|a|b"); + assertEquals("a|b", simplified); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java index b9f4a1f..1a0bbc5 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java @@ -18,7 +18,7 @@ */ package org.apache.rya.indexing.pcj.storage.accumulo; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import javax.annotation.ParametersAreNonnullByDefault; @@ -30,28 +30,40 @@ import org.openrdf.query.BindingSet; @ParametersAreNonnullByDefault public class VisibilityBindingSet extends BindingSetDecorator { private static final long serialVersionUID = 1L; - private final String visibility; + private String visibility; private volatile int hashCode; /** - * @param set - Decorates the {@link BindingSet} with no visibilities. + * Creates a new {@link VisibilityBindingSet} that does not have any visibilities + * associated with it. + * + * @param set - Decorates the {@link BindingSet} with no visibilities. (not null) */ public VisibilityBindingSet(final BindingSet set) { this(set, ""); } /** - * Creates a new {@link VisibilityBindingSet} - * @param set - The {@link BindingSet} to decorate - * @param visibility - The visibilities on the {@link BindingSet} (not null) + * Creates a new {@link VisibilityBindingSet}. + * + * @param set - The {@link BindingSet} to decorate. (not null) + * @param visibility - The Visibilities on the {@link BindingSet}. (not null) */ public VisibilityBindingSet(final BindingSet set, final String visibility) { super(set); - this.visibility = checkNotNull(visibility); + this.visibility = requireNonNull(visibility); } /** - * @return - The Visibilities on the {@link BindingSet} + * @param visibility - The Visibilities on the {@link BindingSet}. (not null) + */ + public void setVisibility(final String visibility) { + requireNonNull(visibility); + this.visibility = visibility; + } + + /** + * @return The Visibilities on the {@link BindingSet}. */ public String getVisibility() { return visibility; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java index 8ff01ac..307d0b4 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java @@ -22,8 +22,6 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.openrdf.query.BindingSet; -import com.google.common.base.Strings; - /** * Converts {@link BindingSet}s to Strings and back again. The Strings do not * include the binding names and are ordered with a {@link VariableOrder}. @@ -32,28 +30,36 @@ import com.google.common.base.Strings; public class VisibilityBindingSetStringConverter extends BindingSetStringConverter { public static final char VISIBILITY_DELIM = 1; + private static final int BINDING_SET_STRING_INDEX = 0; + private static final int VISIBILITY_EXPRESSION_INDEX = 1; + @Override public String convert(final BindingSet bindingSet, final VariableOrder varOrder) { - String visibility = ""; + // Convert the BindingSet into its String format. + String bindingSetString = super.convert(bindingSet, varOrder); + + // Append the visibilities if they are present. if(bindingSet instanceof VisibilityBindingSet) { - final VisibilityBindingSet visiSet = (VisibilityBindingSet) bindingSet; - if(!Strings.isNullOrEmpty(visiSet.getVisibility())) { - visibility = VISIBILITY_DELIM + visiSet.getVisibility(); + final String visibility = ((VisibilityBindingSet) bindingSet).getVisibility(); + if(!visibility.isEmpty()) { + bindingSetString += VISIBILITY_DELIM + visibility; } } - return super.convert(bindingSet, varOrder) + visibility; + + return bindingSetString; } @Override - public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) { - final String[] visiStrings = bindingSetString.split("" + VISIBILITY_DELIM); - BindingSet bindingSet = super.convert(visiStrings[0], varOrder); - - if(visiStrings.length > 1) { - bindingSet = new VisibilityBindingSet(bindingSet, visiStrings[1]); - } else { - bindingSet = new VisibilityBindingSet(bindingSet); - } - return bindingSet; + public VisibilityBindingSet convert(final String bindingSetString, final VariableOrder varOrder) { + // Try to split the binding set string over the visibility delimiter. + final String[] strings = bindingSetString.split("" + VISIBILITY_DELIM); + + // Convert the binding set string into a BindingSet. + final BindingSet bindingSet = super.convert(strings[BINDING_SET_STRING_INDEX], varOrder); + + // If a visibility expression is present, then also include it. + return (strings.length > 1) ? + new VisibilityBindingSet(bindingSet, strings[VISIBILITY_EXPRESSION_INDEX]) : + new VisibilityBindingSet(bindingSet); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverterTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverterTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverterTest.java index 9adb8f2..fffe47f 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverterTest.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverterTest.java @@ -22,10 +22,6 @@ import static org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetS import static org.junit.Assert.assertEquals; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter; import org.junit.Test; import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; @@ -35,6 +31,7 @@ import org.openrdf.query.impl.MapBindingSet; * Tests the methods of {@link BindingSetStringConverter}. */ public class VisibilityBindingSetStringConverterTest { + @Test public void toString_URIs() throws BindingSetConversionException { // Setup the binding set that will be converted. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index bc9da29..2f7a1ea 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -20,6 +20,9 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import java.util.HashMap; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException; @@ -42,6 +45,7 @@ import io.fluo.api.types.Encoder; import io.fluo.api.types.StringEncoder; import io.fluo.api.types.TypedObserver; import io.fluo.api.types.TypedTransactionBase; +import mvm.rya.accumulo.utils.VisibilitySimplifier; /** * Performs incremental result exporting to the configured destinations. @@ -54,6 +58,16 @@ public class QueryResultObserver extends TypedObserver { private static final VisibilityBindingSetStringConverter CONVERTER = new VisibilityBindingSetStringConverter(); /** + * Simplifies Visibility expressions prior to exporting PCJ results. + */ + private static final VisibilitySimplifier SIMPLIFIER = new VisibilitySimplifier(); + + /** + * We expect to see the same expressions a lot, so we cache the simplified forms. + */ + private final Map<String, String> simplifiedVisibilities = new HashMap<>(); + + /** * Builders for each type of result exporter we support. */ private static final ImmutableSet<IncrementalResultExporterFactory> factories = @@ -103,8 +117,19 @@ public class QueryResultObserver extends TypedObserver { final QueryMetadata queryMetadata = QUERY_DAO.readQueryMetadata(tx, queryId); final VariableOrder varOrder = queryMetadata.getVariableOrder(); + // Create the result that will be exported. + final VisibilityBindingSet result = CONVERTER.convert(bindingSetString, varOrder); + + // Simplify the result's visibilities. + final String visibility = result.getVisibility(); + if(!simplifiedVisibilities.containsKey(visibility)) { + final String simplified = SIMPLIFIER.simplify( visibility ); + simplifiedVisibilities.put(visibility, simplified); + } + + result.setVisibility( simplifiedVisibilities.get(visibility) ); + // Export the result using each of the provided exporters. - final VisibilityBindingSet result = (VisibilityBindingSet) CONVERTER.convert(bindingSetString, varOrder); for(final IncrementalResultExporter exporter : exporters) { try { exporter.export(tx, queryId, result); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index 93968e4..0b24abc 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -22,13 +22,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.File; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -42,6 +39,7 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.minicluster.MiniAccumuloConfig; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; @@ -62,11 +60,8 @@ import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryException; import org.openrdf.sail.Sail; -import org.openrdf.sail.SailException; -import com.google.common.base.Optional; import com.google.common.io.Files; import io.fluo.api.client.FluoAdmin; @@ -83,27 +78,18 @@ import io.fluo.api.iterator.ColumnIterator; import io.fluo.api.iterator.RowIterator; import io.fluo.api.mini.MiniFluo; import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.RyaClient; +import mvm.rya.api.client.accumulo.AccumuloConnectionDetails; +import mvm.rya.api.client.accumulo.AccumuloRyaClientFactory; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; -import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; -import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; -import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.ProspectorDetails; -import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import mvm.rya.api.persist.RyaDAOException; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.indexing.accumulo.ConfigUtils; import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; import mvm.rya.rdftriplestore.RyaSailRepository; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; import mvm.rya.sail.config.RyaSailFactory; /** @@ -115,26 +101,23 @@ import mvm.rya.sail.config.RyaSailFactory; public abstract class ITBase { private static final Logger log = Logger.getLogger(ITBase.class); - protected static final String RYA_INSTANCE_NAME = "demo_"; - - protected static final String ACCUMULO_USER = "root"; - protected static final String ACCUMULO_PASSWORD = "password"; - // Rya data store and connections. + protected static final String RYA_INSTANCE_NAME = "demo_"; protected RyaSailRepository ryaRepo = null; protected RepositoryConnection ryaConn = null; - // Mini Accumulo Cluster + protected static final String ACCUMULO_USER = "root"; + protected static final String ACCUMULO_PASSWORD = "password"; protected MiniAccumuloCluster cluster; protected static Connector accumuloConn = null; protected String instanceName = null; protected String zookeepers = null; // Fluo data store and connections. + protected static final String FLUO_APP_NAME = "IntegrationTests"; protected MiniFluo fluo = null; protected FluoClient fluoClient = null; - protected final String appName = "IntegrationTests"; @BeforeClass public static void killLoudLogs() { @@ -142,9 +125,7 @@ public abstract class ITBase { } @Before - public void setupMiniResources() - throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException, - RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException, AlreadyInitializedException, RyaDetailsRepositoryException, SailException { + public void setupMiniResources() throws Exception { // Initialize the Mini Accumulo that will be used to host Rya and Fluo. setupMiniAccumulo(); @@ -153,7 +134,7 @@ public abstract class ITBase { fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); // Initialize the Rya that will be used by the tests. - ryaRepo = setupRya(ACCUMULO_USER, ACCUMULO_PASSWORD, instanceName, zookeepers, appName); + ryaRepo = setupRya(instanceName, zookeepers); ryaConn = ryaRepo.getConnection(); } @@ -214,8 +195,7 @@ public abstract class ITBase { * A helper fuction for creating a {@link BindingSet} from an array of * {@link Binding}s. * - * @param bindings - * - The bindings to include in the set. (not null) + * @param bindings - The bindings to include in the set. (not null) * @return A {@link BindingSet} holding the bindings. */ protected static BindingSet makeBindingSet(final Binding... bindings) { @@ -230,12 +210,9 @@ public abstract class ITBase { * A helper function for creating a {@link RyaStatement} that represents a * Triple. * - * @param subject - * - The Subject of the Triple. (not null) - * @param predicate - * - The Predicate of the Triple. (not null) - * @param object - * - The Object of the Triple. (not null) + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) * @return A Triple as a {@link RyaStatement}. */ protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) { @@ -259,12 +236,9 @@ public abstract class ITBase { * A helper function for creating a {@link RyaStatement} that represents a * Triple. * - * @param subject - * - The Subject of the Triple. (not null) - * @param predicate - * - The Predicate of the Triple. (not null) - * @param object - * - The Object of the Triple. (not null) + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) * @return A Triple as a {@link RyaStatement}. */ protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) { @@ -279,12 +253,9 @@ public abstract class ITBase { * A helper function for creating a Sesame {@link Statement} that represents * a Triple.. * - * @param subject - * - The Subject of the Triple. (not null) - * @param predicate - * - The Predicate of the Triple. (not null) - * @param object - * - The Object of the Triple. (not null) + * @param subject - The Subject of the Triple. (not null) + * @param predicate - The Predicate of the Triple. (not null) + * @param object - The Object of the Triple. (not null) * @return A Triple as a {@link Statement}. */ protected static Statement makeStatement(final String subject, final String predicate, final String object) { @@ -297,14 +268,10 @@ public abstract class ITBase { } /** - * Fetches the binding sets that are the results of a specific SPARQL query - * from the Fluo table. + * Fetches the binding sets that are the results of a specific SPARQL query from the Fluo table. * - * @param fluoClient- - * A connection to the Fluo table where the results reside. (not - * null) - * @param sparql - * - This query's results will be fetched. (not null) + * @param fluoClient- A connection to the Fluo table where the results reside. (not null) + * @param sparql - This query's results will be fetched. (not null) * @return The binding sets for the query's results. */ protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) { @@ -354,77 +321,59 @@ public abstract class ITBase { /** * Sets up a Rya instance. - * @throws SailException */ - protected static RyaSailRepository setupRya(final String user, final String password, final String instanceName, final String zookeepers, final String appName) - throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, - NumberFormatException, UnknownHostException, InferenceEngineException, mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException, RyaDetailsRepositoryException, SailException { - - checkNotNull(user); - checkNotNull(password); + protected static RyaSailRepository setupRya(final String instanceName, final String zookeepers) throws Exception { checkNotNull(instanceName); checkNotNull(zookeepers); - checkNotNull(appName); - // Setup Rya configuration values. + // Install the Rya instance to the mini accumulo cluster. + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), accumuloConn); + + ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableFreeTextIndex(true) + .setEnableEntityCentricIndex(true) + .setEnableGeoIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setFluoPcjAppName(FLUO_APP_NAME) + .build()); + + // Connect to the Rya instance that was just installed. + final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); + final Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); + return ryaRepo; + } + + protected static AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setTablePrefix(RYA_INSTANCE_NAME); - conf.setDisplayQueryPlan(true); - conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); - conf.set(ConfigUtils.CLOUDBASE_USER, user); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, password); + // Accumulo connection information. + conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD); conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + // PCJ configuration information. conf.set(ConfigUtils.USE_PCJ, "true"); conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true"); - conf.set(ConfigUtils.FLUO_APP_NAME, appName); + conf.set(ConfigUtils.FLUO_APP_NAME, FLUO_APP_NAME); conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); - - final Sail sail = RyaSailFactory.getInstance(conf); - final RyaSailRepository ryaRepo = new RyaSailRepository(sail); - - // Initialize the Rya Details. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, RYA_INSTANCE_NAME); - - final RyaDetails details = RyaDetails.builder() - .setRyaInstanceName(RYA_INSTANCE_NAME) - .setRyaVersion("0.0.0.0") - .setFreeTextDetails( new FreeTextIndexDetails(true) ) - .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) - .setGeoIndexDetails( new GeoIndexDetails(true) ) - .setTemporalIndexDetails( new TemporalIndexDetails(true) ) - .setPCJIndexDetails( - PCJIndexDetails.builder() - .setEnabled(true) ) - .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.<Date>absent() ) ) - .setProspectorDetails( new ProspectorDetails( Optional.<Date>absent() )) - .build(); - - detailsRepo.initialize(details); - - return ryaRepo; - } + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - /** - * Override this method to provide an output configuration to the Fluo - * application. - * <p> - * Returns an empty map by default. - * - * @return The parameters that will be passed to {@link QueryResultObserver} - * at startup. - */ - protected Map<String, String> makeExportParams() { - return new HashMap<>(); + return conf; } /** - * Setup a Mini Fluo cluster that uses a temporary directory to store its - * data. + * Setup a Mini Fluo cluster that uses a temporary directory to store its data. * * @return A Mini Fluo cluster. */ @@ -436,11 +385,19 @@ public abstract class ITBase { observers.add(new ObserverConfiguration(JoinObserver.class.getName())); observers.add(new ObserverConfiguration(FilterObserver.class.getName())); - // Provide export parameters child test classes may provide to the - // export observer. - final ObserverConfiguration exportObserverConfig = new ObserverConfiguration( - QueryResultObserver.class.getName()); - exportObserverConfig.setParameters(makeExportParams()); + // Configure the export observer to export new PCJ results to the mini accumulo cluster. + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName()); + + final HashMap<String, String> exportParams = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); + ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); + + exportObserverConfig.setParameters(exportParams); observers.add(exportObserverConfig); // Configure how the mini fluo will run. @@ -452,8 +409,8 @@ public abstract class ITBase { config.setInstanceZookeepers(zookeepers + "/fluo"); config.setAccumuloZookeepers(zookeepers); - config.setApplicationName(appName); - config.setAccumuloTable("fluo" + appName); + config.setApplicationName(FLUO_APP_NAME); + config.setAccumuloTable("fluo" + FLUO_APP_NAME); config.addObservers(observers); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 41a4b3d..b8e068c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -64,8 +64,8 @@ public class CountStatementsIT extends ITBase { config.setInstanceZookeepers(zookeepers + "/fluo"); config.setAccumuloZookeepers(zookeepers); - config.setApplicationName(appName); - config.setAccumuloTable("fluo" + appName); + config.setApplicationName(FLUO_APP_NAME); + config.setAccumuloTable("fluo" + FLUO_APP_NAME); config.addObservers(observers); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 2ea2202..d173e82 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -20,15 +20,12 @@ package org.apache.rya.indexing.pcj.fluo.integration; import static org.junit.Assert.assertEquals; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; @@ -48,24 +45,6 @@ import mvm.rya.api.domain.RyaStatement; */ public class RyaExportIT extends ITBase { - /** - * Configure the export observer to use the Mini Accumulo instance as the - * export destination for new PCJ results. - */ - @Override - protected Map<String, String> makeExportParams() { - final HashMap<String, String> params = new HashMap<>(); - - final RyaExportParameters ryaParams = new RyaExportParameters(params); - ryaParams.setExportToRya(true); - ryaParams.setAccumuloInstanceName(instanceName); - ryaParams.setZookeeperServers(zookeepers); - ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); - ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); - return params; - } - @Test public void resultsExported() throws Exception { final String sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bd455258/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index b23b4a4..79e1b84 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -24,50 +24,138 @@ import static org.junit.Assert.assertTrue; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; +import org.apache.hadoop.io.Text; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.sail.Sail; import com.beust.jcommander.internal.Sets; import com.google.common.base.Optional; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.client.RyaClient; +import mvm.rya.api.client.accumulo.AccumuloConnectionDetails; +import mvm.rya.api.client.accumulo.AccumuloRyaClientFactory; import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.rdftriplestore.RyaSailRepository; +import mvm.rya.sail.config.RyaSailFactory; +/** + * Integration tests that ensure the Fluo Application properly exports PCJ + * results with the correct Visibility values. + */ public class PcjVisibilityIT extends ITBase { - /** - * Configure the export observer to use the Mini Accumulo instance as the - * export destination for new PCJ results. - */ - @Override - protected Map<String, String> makeExportParams() { - final HashMap<String, String> params = new HashMap<>(); - - final RyaExportParameters ryaParams = new RyaExportParameters(params); - ryaParams.setExportToRya(true); - ryaParams.setAccumuloInstanceName(instanceName); - ryaParams.setZookeeperServers(zookeepers); - ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); - ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); - return params; + private static final ValueFactory VF = new ValueFactoryImpl(); + + // Constants used within the test. + private static final URI ALICE = VF.createURI("urn:Alice"); + private static final URI BOB = VF.createURI("urn:Bob"); + private static final URI TALKS_TO = VF.createURI("urn:talksTo"); + private static final URI LIVES_IN = VF.createURI("urn:livesIn"); + private static final URI WORKS_AT = VF.createURI("urn:worksAt"); + private static final URI HAPPYVILLE = VF.createURI("urn:Happyville"); + private static final URI BURGER_JOINT = VF.createURI("urn:BurgerJoint"); + + @Test + public void visibilitySimplified() throws Exception { + // Create a PCJ index within Rya. + final String sparql = + "SELECT ?customer ?worker ?city " + + "{ " + + "?customer <" + TALKS_TO + "> ?worker. " + + "?worker <" + LIVES_IN + "> ?city. " + + "?worker <" + WORKS_AT + "> <" + BURGER_JOINT + ">. " + + "}"; + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), accumuloConn); + + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + + // Grant the root user the "u" authorization. + accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("u")); + + // Setup a connection to the Rya instance that uses the "u" authorizations. This ensures + // any statements that are inserted will have the "u" authorization on them and that the + // PCJ updating application will have to maintain visibilities. + final AccumuloRdfConfiguration ryaConf = super.makeConfig(instanceName, zookeepers); + ryaConf.set(ConfigUtils.CLOUDBASE_AUTHS, "u"); + + Sail sail = null; + RyaSailRepository ryaRepo = null; + RepositoryConnection ryaConn = null; + + try { + sail = RyaSailFactory.getInstance(ryaConf); + ryaRepo = new RyaSailRepository(sail); + ryaConn = ryaRepo.getConnection(); + + // Load a few Statements into Rya. + ryaConn.add(VF.createStatement(ALICE, TALKS_TO, BOB)); + ryaConn.add(VF.createStatement(BOB, LIVES_IN, HAPPYVILLE)); + ryaConn.add(VF.createStatement(BOB, WORKS_AT, BURGER_JOINT)); + + // Wait for Fluo to finish processing. + fluo.waitForObservers(); + + // Fetch the exported result and show that its column visibility has been simplified. + final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_INSTANCE_NAME, pcjId); + final Scanner scan = accumuloConn.createScanner(pcjTableName, new Authorizations("u")); + scan.fetchColumnFamily(new Text("customer;worker;city")); + + final Entry<Key, Value> result = scan.iterator().next(); + final Key key = result.getKey(); + assertEquals(new Text("u"), key.getColumnVisibility()); + + } finally { + if(ryaConn != null) { + try { + ryaConn.close(); + } finally { } + } + + if(ryaRepo != null) { + try { + ryaRepo.shutDown(); + } finally { } + } + + if(sail != null) { + try { + sail.shutDown(); + } finally { } + } + } } @Test @@ -111,7 +199,8 @@ public class PcjVisibilityIT extends ITBase { // Stream the data into Fluo. for(final RyaStatement statement : streamedTriples.keySet()) { - new InsertTriples().insert(fluoClient, statement, Optional.of(streamedTriples.get(statement))); + final Optional<String> visibility = Optional.of(streamedTriples.get(statement)); + new InsertTriples().insert(fluoClient, statement, visibility); } // Fetch the exported results from Accumulo once the observers finish working.
