Repository: incubator-rya Updated Branches: refs/heads/develop 2de66c1f4 -> 3032690ee
Added a deserializeStatement method for the EntityCentricIndex Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3032690e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3032690e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3032690e Branch: refs/heads/develop Commit: 3032690eede4afcc26c9c6048c0773261fd52412 Parents: 2de66c1 Author: Jesse Hatfield <[email protected]> Authored: Fri May 27 11:00:31 2016 -0400 Committer: pujav65 <[email protected]> Committed: Thu Jun 16 11:09:24 2016 -0400 ---------------------------------------------------------------------- .../accumulo/entity/EntityCentricIndex.java | 56 +++++++++ .../accumulo/entity/EntityCentricIndexTest.java | 115 +++++++++++++++++++ 2 files changed, 171 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3032690e/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java index 2ce4331..5633326 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -27,6 +27,7 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -38,6 +39,7 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; @@ -240,6 +242,60 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { )); } + /** + * Deserialize a row from the entity-centric index. + * @param key Row key, contains statement data + * @param value Row value + * @return The statement represented by the row + * @throws IOException if edge direction can't be extracted as expected. + * @throws RyaTypeResolverException if a type error occurs deserializing the statement's object. + */ + public static RyaStatement deserializeStatement(Key key, Value value) throws RyaTypeResolverException, IOException { + assert key != null; + assert value != null; + byte[] entityBytes = key.getRowData().toArray(); + byte[] predicateBytes = key.getColumnFamilyData().toArray(); + byte[] data = key.getColumnQualifierData().toArray(); + long timestamp = key.getTimestamp(); + byte[] columnVisibility = key.getColumnVisibilityData().toArray(); + byte[] valueBytes = value.get(); + + // main entity is either the subject or object + // data contains: column family , var name of other node , data of other node + datatype of object + int split = Bytes.indexOf(data, DELIM_BYTES); + byte[] columnFamily = Arrays.copyOf(data, split); + byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); + split = Bytes.indexOf(edgeBytes, DELIM_BYTES); + String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); + byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2); + byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); + byte[] objectBytes; + RyaURI subject; + RyaURI predicate = new RyaURI(new String(predicateBytes)); + RyaType object; + RyaURI context = null; + // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker} + // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker} + switch (otherNodeVar) { + case "subject": + subject = new RyaURI(new String(otherNodeBytes)); + objectBytes = Bytes.concat(entityBytes, typeBytes); + break; + case "object": + subject = new RyaURI(new String(entityBytes)); + objectBytes = Bytes.concat(otherNodeBytes, typeBytes); + break; + default: + throw new IOException("Failed to deserialize entity-centric index row. " + + "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'"); + } + object = RyaContext.getInstance().deserialize(objectBytes); + if (columnFamily != null && columnFamily.length > 0) { + context = new RyaURI(new String(columnFamily)); + } + return new RyaStatement(subject, predicate, object, context, + null, columnVisibility, valueBytes, timestamp); + } @Override public void init() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3032690e/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndexTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndexTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndexTest.java new file mode 100644 index 0000000..e1e65ff --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndexTest.java @@ -0,0 +1,115 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Collection; +import org.junit.Test; +import org.openrdf.model.vocabulary.XMLSchema; + +import com.google.common.primitives.Bytes; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTypeResolverException; + +public class EntityCentricIndexTest { + private static RyaStatement ryaStatement; + private static Key subjectCentricKey; + private static Key objectCentricKey; + private static Value value; + + @BeforeClass + public static void init() throws RyaTypeResolverException { + String subjectStr = ":subject"; + String predicateStr = ":predicate"; + RyaType object = new RyaType(XMLSchema.INTEGER, "3"); + byte[][] objectBytes = RyaContext.getInstance().serializeType(object); + String contextStr = "http://example.org/graph"; + // no qualifier since entity-centric index doesn't store an actual column qualifier + long timestamp = (long) 123456789; + byte[] visibilityBytes = "test_visibility".getBytes(); + byte[] valueBytes = "test_value".getBytes(); + subjectCentricKey = new Key( + subjectStr.getBytes(), + predicateStr.getBytes(), + Bytes.concat(contextStr.getBytes(), + DELIM_BYTES, "object".getBytes(), + DELIM_BYTES, objectBytes[0], objectBytes[1]), + visibilityBytes, timestamp); + objectCentricKey = new Key( + objectBytes[0], + predicateStr.getBytes(), + Bytes.concat(contextStr.getBytes(), + DELIM_BYTES, "subject".getBytes(), + DELIM_BYTES, subjectStr.getBytes(), objectBytes[1]), + visibilityBytes, timestamp); + ryaStatement = new RyaStatement( + new RyaURI(subjectStr), + new RyaURI(predicateStr), + new RyaType(XMLSchema.INTEGER, "3"), + new RyaURI(contextStr), + null, visibilityBytes, valueBytes, timestamp); + value = new Value(valueBytes); + } + + private static Mutation createMutationFromKeyValue(Key key, Value value) { + Mutation m = new Mutation(key.getRow()); + m.put(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp(), value); + return m; + } + + @Test + public void testSerializeStatement() throws RyaTypeResolverException { + Collection<Mutation> indexMutations = EntityCentricIndex.createMutations(ryaStatement); + Assert.assertEquals("Serialization should produce two rows: subject-centric and object-centric.", + 2, indexMutations.size()); + Assert.assertTrue("Serialization of RyaStatement failed to create equivalent subject-centric row.", + indexMutations.contains(createMutationFromKeyValue(subjectCentricKey, value))); + Assert.assertTrue("Serialization of RyaStatement failed to create equivalent object-centric row.", + indexMutations.contains(createMutationFromKeyValue(objectCentricKey, value))); + } + + @Test + public void testDeserializeSubjectRow() throws RyaTypeResolverException, IOException { + RyaStatement deserialized = EntityCentricIndex.deserializeStatement(subjectCentricKey, value); + Assert.assertEquals("Deserialization of subject-centric row failed to produce equivalent RyaStatement.", + ryaStatement, deserialized); + } + + @Test + public void testDeserializeObjectRow() throws RyaTypeResolverException, IOException { + RyaStatement deserialized = EntityCentricIndex.deserializeStatement(objectCentricKey, value); + Assert.assertEquals("Deserialization of object-centric row failed to produce equivalent RyaStatement.", + ryaStatement, deserialized); + } +}
