Repository: incubator-rya
Updated Branches:
  refs/heads/develop 8168b85a7 -> c53b5402b


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c53b5402/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
new file mode 100644
index 0000000..07211d5
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.visibility;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.junit.Test;
+import org.openrdf.model.impl.NumericLiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryException;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import io.fluo.api.client.Snapshot;
+import io.fluo.api.data.Bytes;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.accumulo.VisibilityBindingSet;
+import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer;
+import 
mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException;
+import mvm.rya.indexing.external.tupleSet.PcjTables;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory;
+import mvm.rya.indexing.external.tupleSet.PcjTables.PcjVarOrderFactory;
+import mvm.rya.indexing.external.tupleSet.PcjTables.ShiftVarOrderFactory;
+import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder;
+
+public class PcjVisibilityIT extends ITBase {
+
+    private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+    /**
+     * Configure the export observer to use the Mini Accumulo instance as the
+     * export destination for new PCJ results.
+     */
+    @Override
+    protected Map<String, String> makeExportParams() {
+        final HashMap<String, String> params = new HashMap<>();
+
+        final RyaExportParameters ryaParams = new RyaExportParameters(params);
+        ryaParams.setExportToRya(true);
+        ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
+        ryaParams.setZookeeperServers(accumulo.getZooKeepers());
+        ryaParams.setExporterUsername("root");
+        ryaParams.setExporterPassword("password");
+
+        return params;
+    }
+
+    @Test
+    public void createWithVisibilityDirect() throws RepositoryException, 
PcjException, TableNotFoundException, RyaTypeResolverException, 
AccumuloException, AccumuloSecurityException, BindingSetConversionException {
+        // Create a PCJ table that will include those triples in its results.
+        final String sparql =
+                "SELECT ?name ?age " +
+                "{" +
+                  "FILTER(?age < 30) ." +
+                  "?name <http://hasAge> ?age." +
+                  "?name <http://playsSport> \"Soccer\" " +
+                "}";
+
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+
+        // Create and populate the PCJ table.
+        final PcjTables pcjs = new PcjTables();
+        final PcjVarOrderFactory varOrderFactory = 
Optional.<PcjVarOrderFactory>absent().or(new ShiftVarOrderFactory());
+        final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( 
new VariableOrder(new String[]{"name", "age"}) );
+
+        // Create the PCJ table in Accumulo.
+        pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
+        setupTestUsers(pcjTableName);
+
+        // Add a few results to the PCJ table.
+        final MapBindingSet alice = new MapBindingSet();
+        alice.addBinding("name", new URIImpl("http://Alice";));
+        alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+        final MapBindingSet bob = new MapBindingSet();
+        bob.addBinding("name", new URIImpl("http://Bob";));
+        bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+        final MapBindingSet charlie = new MapBindingSet();
+        charlie.addBinding("name", new URIImpl("http://Charlie";));
+        charlie.addBinding("age", new NumericLiteralImpl(12, 
XMLSchema.INTEGER));
+
+        final VisibilityBindingSet aliceVisibility = new 
VisibilityBindingSet(alice, "A&B&C");
+        final VisibilityBindingSet bobVisibility = new 
VisibilityBindingSet(bob, "B&C");
+        final VisibilityBindingSet charlieVisibility = new 
VisibilityBindingSet(charlie, "C");
+
+        final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, 
bob, charlie);
+        final Set<VisibilityBindingSet> visibilityResults = 
Sets.<VisibilityBindingSet>newHashSet(aliceVisibility, bobVisibility, 
charlieVisibility);
+        // Load historic matches from Rya into the PCJ table.
+        pcjs.addResults(accumuloConn, pcjTableName, visibilityResults);
+
+        // Make sure the cardinality was updated.
+        final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
+        assertEquals(3, metadata.getCardinality());
+
+        // Scan Accumulo for the stored results.
+        Multimap<String, BindingSet> fetchedResults = 
loadPcjResults(accumuloConn, pcjTableName, "A", "B", "C");
+        assertEquals(getExpectedResults(results), fetchedResults);
+        fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C", "B");
+        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(bob, 
charlie)), fetchedResults);
+        fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C");
+        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
+
+        final Connector cConn = 
accumuloConn.getInstance().getConnector("cUser", new PasswordToken("password"));
+        // Scan Accumulo for the stored results.
+        fetchedResults = loadPcjResults(cConn, pcjTableName, "C");
+        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
+        fetchedResults = loadPcjResults(cConn, pcjTableName);
+        assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), 
fetchedResults);
+
+        final Connector noAuthConn = 
accumuloConn.getInstance().getConnector("noAuth", new 
PasswordToken("password"));
+        // Scan Accumulo for the stored results.
+        fetchedResults = loadPcjResults(noAuthConn, pcjTableName);
+        assertTrue(fetchedResults.isEmpty());
+    }
+
+    @Test
+    public void createWithVisibilityFluo() throws Exception {
+        final String sparql =
+                "SELECT ?customer ?worker ?city " +
+                "{ " +
+                  "?customer <http://talksTo> ?worker. " +
+                  "?worker <http://livesIn> ?city. " +
+                  "?worker <http://worksAt> <http://Chipotle>. " +
+                "}";
+
+        // Triples that will be streamed into Fluo after the PCJ has been 
created.
+        final Map<RyaStatement, String> streamedTriples = new HashMap<>();
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Bob";), "A&B");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Bob";, "http://livesIn";, "http://London";), "A");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Bob";, "http://worksAt";, "http://Chipotle";), "B");
+
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Charlie";), "B&C");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Charlie";, "http://livesIn";, "http://London";), "B");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Charlie";, "http://worksAt";, "http://Chipotle";), "C");
+
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://David";), "C&D");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://David";, "http://livesIn";, "http://London";), "C");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://David";, "http://worksAt";, "http://Chipotle";), "D");
+
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Alice";, "http://talksTo";, "http://Eve";), "D&E");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Eve";, "http://livesIn";, "http://Leeds";), "D");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Eve";, "http://worksAt";, "http://Chipotle";), "E");
+
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://talksTo";, "http://Alice";), "");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://livesIn";, "http://London";), "");
+        addStatementVisibilityEntry(streamedTriples, 
makeRyaStatement("http://Frank";, "http://worksAt";, "http://Chipotle";), "");
+
+        // Create the PCJ in Fluo.
+        new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, 
ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql);
+
+        // Stream the data into Fluo.
+        for(final RyaStatement statement : streamedTriples.keySet()) {
+            new InsertTriples().insert(fluoClient, statement, 
Optional.of(streamedTriples.get(statement)));
+        }
+
+        // Fetch the exported results from Accumulo once the observers finish 
working.
+        fluo.waitForObservers();
+
+        // Fetch expected results from the PCJ table that is in Accumulo.
+        final String exportTableName;
+        try(Snapshot snapshot = fluoClient.newSnapshot()) {
+            final Bytes queryId = snapshot.get(Bytes.of(sparql), 
FluoQueryColumns.QUERY_ID);
+            exportTableName = snapshot.get(queryId, 
FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString();
+        }
+        setupTestUsers(exportTableName);
+        Multimap<String, BindingSet> results = loadPcjResults(accumuloConn, 
exportTableName);
+
+        // Verify the end results of the query match the expected results.
+        Multimap<String, BindingSet> expected = makeExpected(
+            makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Bob";)),
+                new BindingImpl("city", new URIImpl("http://London";))),
+            makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Charlie";)),
+                new BindingImpl("city", new URIImpl("http://London";))),
+            makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Eve";)),
+                new BindingImpl("city", new URIImpl("http://Leeds";))),
+            makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://David";)),
+                new BindingImpl("city", new URIImpl("http://London";))));
+
+        assertEquals(expected,  results);
+
+        final PasswordToken pass = new PasswordToken("password");
+        Connector userConn = accumuloConn.getInstance().getConnector("abUser", 
pass);
+        results = loadPcjResults(userConn, exportTableName);
+        // Verify the end results of the query match the expected results.
+        expected = makeExpected(
+            makeBindingSet(
+                new BindingImpl("customer", new URIImpl("http://Alice";)),
+                new BindingImpl("worker", new URIImpl("http://Bob";)),
+                new BindingImpl("city", new URIImpl("http://London";))));
+        assertEquals(expected,  results);
+
+        userConn = accumuloConn.getInstance().getConnector("abcUser", pass);
+        results = loadPcjResults(userConn, exportTableName);
+        expected = makeExpected(
+                makeBindingSet(
+                        new BindingImpl("customer", new 
URIImpl("http://Alice";)),
+                        new BindingImpl("worker", new URIImpl("http://Bob";)),
+                        new BindingImpl("city", new URIImpl("http://London";))),
+                makeBindingSet(
+                        new BindingImpl("customer", new 
URIImpl("http://Alice";)),
+                        new BindingImpl("worker", new 
URIImpl("http://Charlie";)),
+                        new BindingImpl("city", new 
URIImpl("http://London";))));
+        assertEquals(expected,  results);
+
+        userConn = accumuloConn.getInstance().getConnector("adeUser", pass);
+        results = loadPcjResults(userConn, exportTableName);
+        expected = makeExpected(
+                makeBindingSet(
+                    new BindingImpl("customer", new URIImpl("http://Alice";)),
+                    new BindingImpl("worker", new URIImpl("http://Eve";)),
+                    new BindingImpl("city", new URIImpl("http://Leeds";))));
+        assertEquals(expected,  results);
+
+        userConn = accumuloConn.getInstance().getConnector("noAuth", pass);
+        results = loadPcjResults(userConn, exportTableName);
+        assertTrue(results.isEmpty());
+    }
+
+    private Multimap<String, BindingSet> makeExpected(final BindingSet... 
bindingSets) {
+        final Set<BindingSet> expectedResults = new HashSet<>();
+        for(final BindingSet bs : bindingSets) {
+            expectedResults.add(bs);
+        }
+
+        final Multimap<String, BindingSet> expected = HashMultimap.create();
+        expected.putAll("customer;worker;city", expectedResults);
+        expected.putAll("worker;city;customer", expectedResults);
+        expected.putAll("city;customer;worker", expectedResults);
+        return expected;
+    }
+
+    private void setupTestUsers(final String exportTableName) throws 
AccumuloException, AccumuloSecurityException {
+        final PasswordToken pass = new PasswordToken("password");
+        final SecurityOperations secOps = accumuloConn.securityOperations();
+        secOps.changeUserAuthorizations("root", new Authorizations("A", "B", 
"C", "D", "E"));
+        secOps.createLocalUser("abUser", pass);
+        secOps.changeUserAuthorizations("abUser", new Authorizations("A", 
"B"));
+        secOps.grantTablePermission("abUser", exportTableName, 
TablePermission.READ);
+
+        secOps.createLocalUser("abcUser", pass);
+        secOps.changeUserAuthorizations("abcUser", new Authorizations("A", 
"B", "C"));
+        secOps.grantTablePermission("abcUser", exportTableName, 
TablePermission.READ);
+
+        secOps.createLocalUser("adeUser", pass);
+        secOps.changeUserAuthorizations("adeUser", new Authorizations("A", 
"D", "E"));
+        secOps.grantTablePermission("adeUser", exportTableName, 
TablePermission.READ);
+
+        secOps.createLocalUser("cUser", pass);
+        secOps.changeUserAuthorizations("cUser", new Authorizations("C"));
+        secOps.grantTablePermission("cUser", exportTableName, 
TablePermission.READ);
+
+        secOps.createLocalUser("noAuth", pass);
+        secOps.changeUserAuthorizations("noAuth", new Authorizations());
+        secOps.grantTablePermission("noAuth", exportTableName, 
TablePermission.READ);
+    }
+
+
+    private Multimap<String, BindingSet> getExpectedResults(final 
Set<BindingSet> results) {
+        final Multimap<String, BindingSet> expectedResults = 
HashMultimap.create();
+        expectedResults.putAll("name;age", results);
+        expectedResults.putAll("age;name", results);
+        return expectedResults;
+    }
+
+    protected static void addStatementVisibilityEntry(final Map<RyaStatement, 
String> triplesMap, final RyaStatement statement, final String visibility) {
+        triplesMap.put(statement, visibility);
+    }
+
+    /**
+     * Scan accumulo for the results that are stored in a PCJ table. The
+     * multimap stores a set of deserialized binding sets that were in the PCJ
+     * table for every variable order that is found in the PCJ metadata.
+     * @throws AccumuloSecurityException
+     * @throws AccumuloException
+     */
+    private static Multimap<String, BindingSet> loadPcjResults(final Connector 
accumuloConn, final String pcjTableName, final String... visibility) throws 
PcjException, TableNotFoundException, BindingSetConversionException, 
AccumuloException, AccumuloSecurityException {
+        final Multimap<String, BindingSet> fetchedResults = 
HashMultimap.create();
+
+        final Authorizations userAuths;
+        if(visibility.length == 0) {
+            userAuths = 
accumuloConn.securityOperations().getUserAuthorizations(accumuloConn.whoami());
+        } else {
+            userAuths = new Authorizations(visibility);
+        }
+
+        // Get the variable orders the data was written to.
+        final PcjTables pcjs = new PcjTables();
+        final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, 
pcjTableName);
+
+        // Scan Accumulo for the stored results.
+        for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) {
+            final Scanner scanner = accumuloConn.createScanner(pcjTableName, 
userAuths);
+            scanner.fetchColumnFamily( new Text(varOrder.toString()) );
+
+            for(final Entry<Key, Value> entry : scanner) {
+                final byte[] serializedResult = 
entry.getKey().getRow().getBytes();
+                final BindingSet result = converter.convert(serializedResult, 
varOrder);
+                fetchedResults.put(varOrder.toString(), result);
+            }
+        }
+
+        return fetchedResults;
+    }
+}

Reply via email to