http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java new file mode 100644 index 0000000..83d1c40 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.model.Value; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.primitives.Bytes; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTypeResolverException; + +/** + * Converts {@link BindingSet}s to byte[]s and back again. The bytes do not + * include the binding names and are ordered with a {@link VariableOrder}. + */ +@ParametersAreNonnullByDefault +public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> { + + @Override + public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException { + checkNotNull(bindingSet); + checkNotNull(varOrder); + checkBindingsSubsetOfVarOrder(bindingSet, varOrder); + + // A list that holds all of the byte segments that will be concatenated at the end. + // This minimizes byte[] construction. + final List<byte[]> byteSegments = new LinkedList<>(); + + try { + for(final String varName: varOrder) { + // Only write information for a variable name if the binding set contains it. + if(bindingSet.hasBinding(varName)) { + final RyaType rt = RdfToRyaConversions.convertValue(bindingSet.getBinding(varName).getValue()); + final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt); + byteSegments.add(serializedVal[0]); + byteSegments.add(serializedVal[1]); + } + + // But always write the value delimiter. If a value is missing, you'll see two delimiters next to each-other. + byteSegments.add(DELIM_BYTES); + } + + return concat(byteSegments); + } catch (RyaTypeResolverException e) { + throw new BindingSetConversionException("Could not convert the BindingSet into a byte[].", e); + } + } + + @Override + public BindingSet convert(byte[] bindingSetBytes, VariableOrder varOrder) throws BindingSetConversionException { + checkNotNull(bindingSetBytes); + checkNotNull(varOrder); + + try { + // Slice the row into bindings. + List<byte[]> values = splitlByDelimByte(bindingSetBytes); + String[] varOrderStrings = varOrder.toArray(); + checkArgument(values.size() == varOrderStrings.length); + + // Convert the Binding bytes into a BindingSet. + final QueryBindingSet bindingSet = new QueryBindingSet(); + + for(int i = 0; i < varOrderStrings.length; i++) { + byte[] valueBytes = values.get(i); + if(valueBytes.length > 0) { + String name = varOrderStrings[i]; + Value value = deserializeValue(valueBytes); + bindingSet.addBinding(name, value); + } + } + + return bindingSet; + } catch (RyaTypeResolverException e) { + throw new BindingSetConversionException("Could not convert the byte[] into a BindingSet.", e); + } + } + + /** + * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet} + * are a subset of the variables names in {@link VariableOrder}. + * + * @param bindingSet - The binding set whose Bindings will be inspected. (not null) + * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null) + * @throws IllegalArgumentException Indicates the names of the bindings are + * not a subset of the variable order. + */ + private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException { + checkNotNull(bindingSet); + checkNotNull(varOrder); + + Set<String> bindingNames = bindingSet.getBindingNames(); + List<String> varNames = varOrder.getVariableOrders(); + checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder."); + } + + private static final byte[] concat(Iterable<byte[]> byteSegments) { + checkNotNull(byteSegments); + + // Allocate a byte array that is able to hold the segments. + int length = 0; + for(byte[] byteSegment : byteSegments) { + length += byteSegment.length; + } + byte[] result = new byte[length]; + + // Copy the segments to the byte array and return it. + ByteBuffer buff = ByteBuffer.wrap(result); + for(byte[] byteSegment : byteSegments) { + buff.put(byteSegment); + } + return result; + } + + private static List<byte[]> splitlByDelimByte(byte[] bindingSetBytes) { + checkNotNull(bindingSetBytes); + + List<byte[]> values = new LinkedList<>(); + + ByteBuffer buff = ByteBuffer.wrap(bindingSetBytes); + int start = 0; + while(buff.hasRemaining()) { + if(buff.get() == DELIM_BYTE) { + // Mark the position of the value delimiter. + int end = buff.position(); + + // Move to the start of the value and copy the bytes into an array. + byte[] valueBytes = new byte[(end - start) -1]; + buff.position(start); + buff.get(valueBytes); + buff.position(end); + values.add(valueBytes); + + // Move the start of the next value to the end of this one. + start = end; + } + } + + return values; + } + + private static Value deserializeValue(byte[] byteVal) throws RyaTypeResolverException { + final int typeIndex = Bytes.indexOf(byteVal, TYPE_DELIM_BYTE); + checkArgument(typeIndex >= 0); + final byte[] data = Arrays.copyOf(byteVal, typeIndex); + final byte[] type = Arrays.copyOfRange(byteVal, typeIndex, byteVal.length); + final RyaType rt = RyaContext.getInstance().deserialize(Bytes.concat(data,type)); + return RyaToRdfConversions.convertValue(rt); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java new file mode 100644 index 0000000..9a52531 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; + +/** + * Converts {@link BindingSet}s into other representations. This library is + * intended to convert between BindingSet and whatever format it is being + * stored as. These formats are often optimized for query evaluation. + * + * @param <T> Defines the type of model {@link BindingSet}s will be converted into/from. + */ +@ParametersAreNonnullByDefault +public interface BindingSetConverter<T> { + + /** + * Converts a {@link BindingSet} into the target model. The target model + * may not include every {@link Binding} that was in the original BindingSet, + * it may not include the binding names, and it may order the binding values. + * All of this information is specified using a {@link VariableOrder}. + * </p> + * Because the resulting model may not include the binding names from the + * original object, you must hold onto that information if you want to + * convert the resulting model back into a BindingSet later. Because the + * resulting model may only contain a subset of the original BindingSet's + * bindings, some information may be lost, so you may not be able to convert + * the target model back into the original BindingSet. + * + * @param bindingSet - The BindingSet that will be converted. (not null) + * @param varOrder - Which bindings and in what order they will appear in the + * resulting model. (not null) + * @return The BindingSet formatted as the target model. + * @throws BindingSetConversionException The BindingSet was unable to be + * converted into the target model. This will happen if the BindingSet has + * a binding whose name is not in the VariableOrder or if one of the values + * could not be converted into the target model. + */ + public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException; + + /** + * Converts the target model representation of a {@link BindingSet} as is + * created by {@link #convert(BindingSet, VariableOrder)} back into a + * BindingSet. + * </p> + * You must provide the Binding names and the order they were written to + * by using a {@link VariableOrder}. + * </p> + * If there is no value for one of the variable order names, then that binding + * will be missing from the resulting BindingSet. + * + * @param bindingSet - The BindingSet formatted as the target model that will + * be converted. (not null) + * @param varOrder - The VariableOrder that was used to create the target model. (not null) + * @return The {@link BindingSet} representation of the target model. + * @throws BindingSetConversionException The target model was unable to be + * converted back into a BindingSet. + */ + public BindingSet convert(T bindingSet, VariableOrder varOrder) throws BindingSetConversionException; + + /** + * One of the conversion methods of {@link BindingSetConverter} was unable to + * to convert the {@link BindingSet} to/from the converted model. + */ + public static class BindingSetConversionException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link BindingSetConversionException}. + * + * @param message - Describes why this exception was thrown. + */ + public BindingSetConversionException(final String message) { + super(message); + } + + /** + * BindingSetConversionException + * + * @param message - Describes why this exception was thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public BindingSetConversionException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java new file mode 100644 index 0000000..ddc7be5 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetDecorator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Iterator; +import java.util.Set; + +import org.openrdf.model.Value; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; + +/** + * Abstracts out the decoration of a {@link BindingSet}. + */ +public abstract class BindingSetDecorator implements BindingSet { + private static final long serialVersionUID = 1L; + protected final BindingSet set; + private volatile int hashCode; + + /** + * Constructs a new {@link BindingSetDecorator}, decorating the provided + * {@link BindingSet}. + * @param set - The {@link BindingSet} to be decorated. (not null) + */ + public BindingSetDecorator(final BindingSet set) { + this.set = checkNotNull(set); + } + + @Override + public Iterator<Binding> iterator() { + return set.iterator(); + } + + @Override + public Set<String> getBindingNames() { + return set.getBindingNames(); + } + + @Override + public Binding getBinding(final String bindingName) { + return set.getBinding(bindingName); + } + + @Override + public boolean hasBinding(final String bindingName) { + return set.hasBinding(bindingName); + } + + @Override + public Value getValue(final String bindingName) { + return set.getValue(bindingName); + } + + @Override + public int size() { + return set.size(); + } + + @Override + public boolean equals(final Object o) { + if(!(o instanceof BindingSetDecorator)) { + return false; + } + final BindingSetDecorator other = (BindingSetDecorator) o; + return set.equals(other.set); + } + + @Override + public int hashCode() { + int result = hashCode; + if(result == 0) { + result = 31 * result + set.hashCode(); + hashCode = result; + } + return result; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append(" names: "); + for (final String name : getBindingNames()) { + sb.append("\n [name]: " + name + " --- [value]: " + getBinding(name).getValue().toString()); + } + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java new file mode 100644 index 0000000..913cfc9 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Joiner; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.RdfToRyaConversions; + +/** + * Converts {@link BindingSet}s to Strings and back again. The Strings do not + * include the binding names and are ordered with a {@link VariableOrder}. + */ +@ParametersAreNonnullByDefault +public class BindingSetStringConverter implements BindingSetConverter<String> { + + public static final String BINDING_DELIM = ":::"; + public static final String TYPE_DELIM = "<<~>>"; + public static final String NULL_VALUE_STRING = Character.toString( '\0' ); + + private static final ValueFactory valueFactory = new ValueFactoryImpl(); + + @Override + public String convert(final BindingSet bindingSet, final VariableOrder varOrder) { + checkBindingsSubsetOfVarOrder(bindingSet, varOrder); + + // Convert each Binding to a String. + final List<String> bindingStrings = new ArrayList<>(); + for(final String varName : varOrder) { + if(bindingSet.hasBinding(varName)) { + // Add a value to the binding set. + final Value value = bindingSet.getBinding(varName).getValue(); + final RyaType ryaValue = RdfToRyaConversions.convertValue(value); + final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType(); + bindingStrings.add(bindingString); + } else { + // Add a null value to the binding set. + bindingStrings.add(NULL_VALUE_STRING); + } + } + + // Join the bindings using the binding delim. + return Joiner.on(BINDING_DELIM).join(bindingStrings); + } + + /** + * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet} + * are a subset of the variables names in {@link VariableOrder}. + * + * @param bindingSet - The binding set whose Bindings will be inspected. (not null) + * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null) + * @throws IllegalArgumentException Indicates the names of the bindings are + * not a subset of the variable order. + */ + private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException { + checkNotNull(bindingSet); + checkNotNull(varOrder); + + final Set<String> bindingNames = bindingSet.getBindingNames(); + final List<String> varNames = varOrder.getVariableOrders(); + checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder."); + } + + @Override + public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) { + checkNotNull(bindingSetString); + checkNotNull(varOrder); + + final String[] bindingStrings = bindingSetString.split(BINDING_DELIM); + final String[] varOrrderArr = varOrder.toArray(); + checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder."); + + final QueryBindingSet bindingSet = new QueryBindingSet(); + for(int i = 0; i < bindingStrings.length; i++) { + final String bindingString = bindingStrings[i]; + if(!NULL_VALUE_STRING.equals(bindingString)) { + final String name = varOrrderArr[i]; + final Value value = toValue(bindingStrings[i]); + bindingSet.addBinding(name, value); + } + } + return bindingSet; + } + + /** + * Creates a {@link Value} from a String representation of it. + * + * @param valueString - The String representation of the value. (not null) + * @return The {@link Value} representation of the String. + */ + protected static Value toValue(final String valueString) { + checkNotNull(valueString); + + // Split the String that was stored in Fluo into its Value and Type parts. + final String[] valueAndType = valueString.split(TYPE_DELIM); + if(valueAndType.length != 2) { + throw new IllegalArgumentException("Array must contain data and type info!"); + } + + final String dataString = valueAndType[0]; + final String typeString = valueAndType[1]; + + // Convert the String Type into a URI that describes the type. + final URI typeURI = valueFactory.createURI(typeString); + + // Convert the String Value into a Value. + final Value value = typeURI.equals(XMLSchema.ANYURI) ? + valueFactory.createURI(dataString) : + valueFactory.createLiteral(dataString, new URIImpl(typeString)); + + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java new file mode 100644 index 0000000..653b4cc --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import java.util.UUID; + +/** + * Creates Accumulo table names that may be recognized by Rya as a table that + * holds the results of a Precomputed Join. + */ +public class PcjTableNameFactory { + + /** + * Creates an Accumulo table names that may be recognized by Rya as a table + * that holds the results of a Precomputed Join. + * </p> + * An Accumulo cluster may host more than one Rya instance. To ensure each + * Rya instance's RDF Triples are segregated from each other, they are stored + * within different Accumulo tables. This is accomplished by prepending a + * {@code tablePrefix} to every table that is owned by a Rya instance. Each + * PCJ table is owned by a specific Rya instance, so it too must be prepended + * with the instance's {@code tablePrefix}. + * </p> + * When Rya scans for PCJ tables that it may use when creating execution plans, + * it looks for any table in Accumulo that has a name starting with its + * {@code tablePrefix} immediately followed by "INDEX". Anything following + * that portion of the table name is just a unique identifier for the SPARQL + * query that is being precomputed. Here's an example of what a table name + * may look like: + * <pre> + * demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9 + * </pre> + * The "demo_INDEX" portion indicates this table is a PCJ table for the "demo_" + * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" portion + * could be anything at all that uniquely identifies the query that is being updated. + * + * @param tablePrefix - The Rya instance's table prefix. (not null) + * @param uniqueId - The unique portion of the Rya PCJ table name. (not null) + * @return A Rya PCJ table name built using the provided values. + */ + public String makeTableName(final String tablePrefix, final String uniqueId) { + return tablePrefix + "INDEX_" + uniqueId; + } + + /** + * Invokes {@link #makeTableName(String, String)} with a randomly generated + * UUID as the {@code uniqueId}. + * + * @param tablePrefix - The Rya instance's table prefix. (not null) + * @return A Rya PCJ table name built using the provided values. + */ + public String makeTableName(final String tablePrefix) { + final String uniqueId = UUID.randomUUID().toString().replaceAll("-", ""); + return makeTableName(tablePrefix, uniqueId); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java new file mode 100644 index 0000000..db74996 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java @@ -0,0 +1,653 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.lexicoder.ListLexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.accumulo.core.client.lexicoder.StringLexicoder; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.google.common.base.Optional; + +/** + * Functions that create and maintain the PCJ tables that are used by Rya. + */ +@ParametersAreNonnullByDefault +public class PcjTables { + private static final Logger log = Logger.getLogger(PcjTables.class); + + /** + * The Row ID of all {@link PcjMetadata} entries that are stored in Accumulo. + */ + private static final Text PCJ_METADATA_ROW_ID = new Text("pcjMetadata"); + + /** + * The Column Family for all PCJ metadata entries. + */ + private static final Text PCJ_METADATA_FAMILY = new Text("metadata"); + + /** + * The Column Qualifier for the SPARQL query a PCJ is built from. + */ + private static final Text PCJ_METADATA_SPARQL_QUERY = new Text("sparql"); + + /** + * The Column Qualifier for the cardinality of a PCJ. + */ + private static final Text PCJ_METADATA_CARDINALITY = new Text("cardinality"); + + /** + * The Column Qualifier for the various variable orders a PCJ's results are written to. + */ + private static final Text PCJ_METADATA_VARIABLE_ORDERS = new Text("variableOrders"); + + // Lexicoders used to read/write PcjMetadata to/from Accumulo. + private static final LongLexicoder longLexicoder = new LongLexicoder(); + private static final StringLexicoder stringLexicoder = new StringLexicoder(); + private static final ListLexicoder<String> listLexicoder = new ListLexicoder<>(stringLexicoder); + + /** + * Create a new PCJ table within an Accumulo instance for a SPARQL query. + * For example, calling the function like this: + * <pre> + * PcjTables.createPcjTable( + * accumuloConn, + * + * "foo_INDEX_query1234", + * + * Sets.newHashSet( + * new VariableOrder("city;worker;customer"), + * new VariableOrder("worker;customer;city") , + * new VariableOrder("customer;city;worker")), + * + * "SELECT ?customer ?worker ?city { " + + * "?customer <http://talksTo> ?worker. " + + * "?worker <http://livesIn> ?city. " + + * "?worker <http://worksAt> <http://Home>. " + + * "}"); + * </pre> + * </p> + * Will result in an Accumulo table named "foo_INDEX_query1234" with the following entries: + * <table border="1" style="width:100%"> + * <tr> <th>Row ID</td> <th>Column</td> <th>Value</td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:sparql</td> <td> ... UTF-8 bytes encoding the query string ... </td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:cardinality</td> <td> The query's cardinality </td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:variableOrders</td> <td> The variable orders the results are written to </td> </tr> + * </table> + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the table that will be created. (not null) + * @param varOrders - The variable orders the results within the table will be written to. (not null) + * @param sparql - The query this table's results solves. (not null) + * @throws PCJStorageException Could not create a new PCJ table either because Accumulo + * would not let us create it or the PCJ metadata was not able to be written to it. + */ + public void createPcjTable( + final Connector accumuloConn, + final String pcjTableName, + final Set<VariableOrder> varOrders, + final String sparql) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(varOrders); + checkNotNull(sparql); + + final TableOperations tableOps = accumuloConn.tableOperations(); + if(!tableOps.exists(pcjTableName)) { + try { + // Create the new table in Accumulo. + tableOps.create(pcjTableName); + + // Write the PCJ Metadata to the newly created table. + final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders); + final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata); + + final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + writer.addMutations(mutations); + writer.close(); + } catch (final TableExistsException e) { + log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName + + "'. This is unexpected, but we will continue as normal."); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e); + } + } + } + + /** + * Create the {@link Mutation}s required to write a {@link PCJMetadata} object + * to an Accumulo table. + * + * @param metadata - The metadata to write. (not null) + * @return An ordered list of mutations that write the metadata to an Accumulo table. + */ + private static List<Mutation> makeWriteMetadataMutations(final PcjMetadata metadata) { + checkNotNull(metadata); + + final List<Mutation> mutations = new LinkedList<>(); + + // SPARQL Query + Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID); + final Value query = new Value( stringLexicoder.encode(metadata.getSparql()) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_SPARQL_QUERY, query); + mutations.add(mutation); + + // Cardinality + mutation = new Mutation(PCJ_METADATA_ROW_ID); + final Value cardinality = new Value( longLexicoder.encode(new Long(metadata.getCardinality())) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, cardinality); + mutations.add(mutation); + + // Variable Orders + final List<String> varOrderStrings = new ArrayList<>(); + for(final VariableOrder varOrder : metadata.getVarOrders()) { + varOrderStrings.add( varOrder.toString() ); + } + + mutation = new Mutation(PCJ_METADATA_ROW_ID); + final Value variableOrders = new Value( listLexicoder.encode(varOrderStrings) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_VARIABLE_ORDERS, variableOrders); + mutations.add(mutation); + + return mutations; + } + + /** + * Fetch the {@link PCJMetadata} from an Accumulo table. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the table that will be search. (not null) + * @return The PCJ Metadata that has been stolred in the in the PCJ Table. + * @throws PCJStorageException The PCJ Table does not exist. + */ + public PcjMetadata getPcjMetadata( + final Connector accumuloConn, + final String pcjTableName) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + + try { + // Create an Accumulo scanner that iterates through the metadata entries. + final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); + final Iterator<Entry<Key, Value>> entries = scanner.iterator(); + + // No metadata has been stored in the table yet. + if(!entries.hasNext()) { + throw new PCJStorageException("Could not find any PCJ metadata in the table named: " + pcjTableName); + } + + // Fetch the metadata from the entries. Assuming they all have the same cardinality and sparql query. + String sparql = null; + Long cardinality = null; + final Set<VariableOrder> varOrders = new HashSet<>(); + + while(entries.hasNext()) { + final Entry<Key, Value> entry = entries.next(); + final Text columnQualifier = entry.getKey().getColumnQualifier(); + final byte[] value = entry.getValue().get(); + + if(columnQualifier.equals(PCJ_METADATA_SPARQL_QUERY)) { + sparql = stringLexicoder.decode(value); + } else if(columnQualifier.equals(PCJ_METADATA_CARDINALITY)) { + cardinality = longLexicoder.decode(value); + } else if(columnQualifier.equals(PCJ_METADATA_VARIABLE_ORDERS)) { + for(final String varOrderStr : listLexicoder.decode(value)) { + varOrders.add( new VariableOrder(varOrderStr) ); + } + } + } + + return new PcjMetadata(sparql, cardinality, varOrders); + + } catch (final TableNotFoundException e) { + throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e); + } + } + + /** + * Add a collection of results to a PCJ table. The table's cardinality will + * be updated to include the new results. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param results - Binding sets that will be written to the PCJ table. (not null) + * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the + * PCJ metadata, or the result could not be written to it. + */ + public void addResults( + final Connector accumuloConn, + final String pcjTableName, + final Collection<VisibilityBindingSet> results) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Write a result to each of the variable orders that are in the table. + writeResults(accumuloConn, pcjTableName, results); + + // Increment the cardinality of the query by the number of new results. + updateCardinality(accumuloConn, pcjTableName, results.size()); + } + + /** + * Add a collection of results to a specific PCJ table. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param results - Binding sets that will be written to the PCJ table. (not null) + * @throws PCJStorageException The provided PCJ table doesn't exist, is missing the + * PCJ metadata, or the result could not be written to it. + */ + private void writeResults( + final Connector accumuloConn, + final String pcjTableName, + final Collection<VisibilityBindingSet> results) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Fetch the variable orders from the PCJ table. + final PcjMetadata metadata = getPcjMetadata(accumuloConn, pcjTableName); + + // Write each result formatted using each of the variable orders. + BatchWriter writer = null; + try { + writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + for(final VisibilityBindingSet result : results) { + final Set<Mutation> addResultMutations = makeWriteResultMutations(metadata.getVarOrders(), result); + writer.addMutations( addResultMutations ); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new PCJStorageException("Could not add results to the PCJ table named: " + pcjTableName, e); + } finally { + if(writer != null) { + try { + writer.close(); + } catch (final MutationsRejectedException e) { + throw new PCJStorageException("Could not add results to a PCJ table because some of the mutations were rejected.", e); + } + } + } + } + + /** + * Create the {@link Mutations} required to write a new {@link BindingSet} + * to a PCJ table for each {@link VariableOrder} that is provided. + * + * @param varOrders - The variables orders the result will be written to. (not null) + * @param result - A new PCJ result. (not null) + * @return Mutation that will write the result to a PCJ table. + * @throws PCJStorageException The binding set could not be encoded. + */ + private static Set<Mutation> makeWriteResultMutations( + final Set<VariableOrder> varOrders, + final VisibilityBindingSet result) throws PCJStorageException { + checkNotNull(varOrders); + checkNotNull(result); + + final Set<Mutation> mutations = new HashSet<>(); + final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + + for(final VariableOrder varOrder : varOrders) { + try { + // Serialize the result to the variable order. + final byte[] serializedResult = converter.convert(result, varOrder); + + // Row ID = binding set values, Column Family = variable order of the binding set. + final Mutation addResult = new Mutation(serializedResult); + final String visibility = result.getVisibility(); + addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), ""); + mutations.add(addResult); + } catch(final BindingSetConversionException e) { + throw new PCJStorageException("Could not serialize a result.", e); + } + } + + return mutations; + } + + /** + * Update the cardinality of a PCJ by a {@code delta}. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will have its cardinality updated. (not null) + * @param delta - How much the cardinality will change. + * @throws PCJStorageException The cardinality could not be updated. + */ + private void updateCardinality( + final Connector accumuloConn, + final String pcjTableName, + final long delta) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + + ConditionalWriter conditionalWriter = null; + try { + conditionalWriter = accumuloConn.createConditionalWriter(pcjTableName, new ConditionalWriterConfig()); + + boolean updated = false; + while(!updated) { + // Write the conditional update request to Accumulo. + final long cardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality(); + final ConditionalMutation mutation = makeUpdateCardinalityMutation(cardinality, delta); + final ConditionalWriter.Result result = conditionalWriter.write(mutation); + + // Interpret the result. + switch(result.getStatus()) { + case ACCEPTED: + updated = true; + break; + case REJECTED: + break; + case UNKNOWN: + // We do not know if the mutation succeeded. At best, we can hope the metadata hasn't been updated + // since we originally fetched it and try again. Otherwise, continue forwards as if it worked. It's + // okay if this number is slightly off. + final long newCardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality(); + if(newCardinality != cardinality) { + updated = true; + } + break; + case VIOLATED: + throw new PCJStorageException("The cardinality could not be updated because the commit violated a table constraint."); + case INVISIBLE_VISIBILITY: + throw new PCJStorageException("The condition contains a visibility the updater can not satisfy."); + } + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e); + } finally { + if(conditionalWriter != null) { + conditionalWriter.close(); + } + } + } + + /** + * Creates a {@link ConditionalMutation} that only updates the cardinality + * of the PCJ table if the old value has not changed by the time this mutation + * is committed to Accumulo. + * + * @param current - The current cardinality value. + * @param delta - How much the cardinality will change. + * @return The mutation that will perform the conditional update. + */ + private static ConditionalMutation makeUpdateCardinalityMutation(final long current, final long delta) { + // Try to update the cardinality by the delta. + final ConditionalMutation mutation = new ConditionalMutation(PCJ_METADATA_ROW_ID); + final Condition lastCardinalityStillCurrent = new Condition( + PCJ_METADATA_FAMILY, + PCJ_METADATA_CARDINALITY); + + // Require the old cardinality to be the value we just read. + final byte[] currentCardinalityBytes = longLexicoder.encode( current ); + lastCardinalityStillCurrent.setValue( currentCardinalityBytes ); + mutation.addCondition(lastCardinalityStillCurrent); + + // If that is the case, then update to the new value. + final Value newCardinality = new Value( longLexicoder.encode(current + delta) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, newCardinality); + return mutation; + } + + /** + * Scan Rya for results that solve the PCJ's query and store them in the PCJ table. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null) + * @throws PCJStorageException If results could not be written to the PCJ table, + * the PCJ table does not exist, or the query that is being execute + * was malformed. + */ + public void populatePcj( + final Connector accumuloConn, + final String pcjTableName, + final RepositoryConnection ryaConn) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(ryaConn); + + try { + // Fetch the query that needs to be executed from the PCJ table. + final PcjMetadata pcjMetadata = getPcjMetadata(accumuloConn, pcjTableName); + final String sparql = pcjMetadata.getSparql(); + + // Query Rya for results to the SPARQL query. + final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); + final TupleQueryResult results = query.evaluate(); + + // Load batches of 1000 of them at a time into the PCJ table + final Set<VisibilityBindingSet> batch = new HashSet<>(1000); + while(results.hasNext()) { + batch.add( new VisibilityBindingSet(results.next()) ); + + if(batch.size() == 1000) { + addResults(accumuloConn, pcjTableName, batch); + batch.clear(); + } + } + + if(!batch.isEmpty()) { + addResults(accumuloConn, pcjTableName, batch); + } + + } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) { + throw new PCJStorageException("Could not populate a PCJ table with Rya results for the table named: " + pcjTableName, e); + } + } + + private static final PcjVarOrderFactory DEFAULT_VAR_ORDER_FACTORY = new ShiftVarOrderFactory(); + + /** + * Creates a new PCJ Table in Accumulo and populates it by scanning an + * instance of Rya for historic matches. + * <p> + * If any portion of this operation fails along the way, the partially + * create PCJ table will be left in Accumulo. + * + * @param ryaConn - Connects to the Rya that will be scanned. (not null) + * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null) + * @param pcjTableName - The name of the PCJ table that will be created. (not null) + * @param sparql - The SPARQL query whose results will be loaded into the table. (not null) + * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null) + * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders + * the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory} + * is used by default. (not null) + * @throws PCJStorageException The PCJ table could not be create or the values from + * Rya were not able to be loaded into it. + */ + public void createAndPopulatePcj( + final RepositoryConnection ryaConn, + final Connector accumuloConn, + final String pcjTableName, + final String sparql, + final String[] resultVariables, + final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PCJStorageException { + checkNotNull(ryaConn); + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(sparql); + checkNotNull(resultVariables); + checkNotNull(pcjVarOrderFactory); + + // Create the PCJ's variable orders. + final PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(DEFAULT_VAR_ORDER_FACTORY); + final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) ); + + // Create the PCJ table in Accumulo. + createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Load historic matches from Rya into the PCJ table. + populatePcj(accumuloConn, pcjTableName, ryaConn); + } + + /** + * List the table names of the PCJ index tables that are stored in Accumulo + * for a specific instance of Rya. + * + * @param accumuloConn - Connects to the accumulo that hosts the PCJ indices. (not null) + * @param ryaInstanceName - The name of the Rya instance. (not null) + * @return A list of Accumulo table names that hold PCJ index data for a + * specific Rya instance. + */ + public List<String> listPcjTables(final Connector accumuloConn, final String ryaInstanceName) { + checkNotNull(accumuloConn); + checkNotNull(ryaInstanceName); + + final List<String> pcjTables = new ArrayList<>(); + + final String pcjPrefix = ryaInstanceName + "INDEX"; + boolean foundInstance = false; + + for(final String tableName : accumuloConn.tableOperations().list()) { + if(tableName.startsWith(ryaInstanceName)) { + // This table is part of the target Rya instance. + foundInstance = true; + + if(tableName.startsWith(pcjPrefix)) { + pcjTables.add(tableName); + } + } + + else if(foundInstance) { + // We have encountered the first table name that does not start + // with the rya instance name after those that do. Because the + // list is sorted, there can't be any more pcj tables for the + // target instance in the list. + break; + } + } + + return pcjTables; + } + + /** + * Deletes all of the rows that are in a PCJ index and sets its cardinality back to 0. + * + * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null) + * @param pcjTableName - The name of the PCJ table that will be purged. (not null) + * @throws PCJStorageException Either the rows could not be dropped from the + * PCJ table or the metadata could not be written back to the table. + */ + public void purgePcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + + // Fetch the metadaata from the PCJ table. + final PcjMetadata oldMetadata = getPcjMetadata(accumuloConn, pcjTableName); + + // Delete all of the rows + try { + accumuloConn.tableOperations().deleteRows(pcjTableName, null, null); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PCJStorageException("Could not delete the rows of data from PCJ table named: " + pcjTableName, e); + } + + // Store the new metadata. + final PcjMetadata newMetadata = new PcjMetadata(oldMetadata.getSparql(), 0L, oldMetadata.getVarOrders()); + final List<Mutation> mutations = makeWriteMetadataMutations(newMetadata); + + BatchWriter writer = null; + try { + writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + writer.addMutations(mutations); + writer.flush(); + } catch (final TableNotFoundException | MutationsRejectedException e) { + throw new PCJStorageException("Could not rewrite the PCJ cardinality for table named '" + + pcjTableName + "'. This table will not work anymore.", e); + } finally { + if(writer != null) { + try { + writer.close(); + } catch (final MutationsRejectedException e) { + throw new PCJStorageException("Could not close the batch writer.", e); + } + } + } + } + + /** + * Drops a PCJ index from Accumulo. + * + * @param accumuloConn - Connects to the Accumulo that hosts the PCJ indices. (not null) + * @param pcjTableName - The name of the PCJ table that will be dropped. (not null) + * @throws PCJStorageException - The table could not be dropped because of + * a security exception or because it does not exist. + */ + public void dropPcjTable(final Connector accumuloConn, final String pcjTableName) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + try { + accumuloConn.tableOperations().delete(pcjTableName); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PCJStorageException("Could not delete PCJ table named: " + pcjTableName, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java new file mode 100644 index 0000000..f806d6e --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import java.util.Set; + +/** + * Create alternative variable orders for a SPARQL query based on + * the original ordering of its results. + */ +public interface PcjVarOrderFactory { + + /** + * Create alternative variable orders for a SPARQL query based on + * the original ordering of its results. + * + * @param varOrder - The initial variable order of a SPARQL query. (not null) + * @return A set of alternative variable orders for the original. + */ + public Set<VariableOrder> makeVarOrders(VariableOrder varOrder); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java new file mode 100644 index 0000000..1ae21e5 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import com.google.common.collect.Lists; + +/** + * Shifts the variables to the left so that each variable will appear at + * the head of the varOrder once. + */ +@ParametersAreNonnullByDefault +public class ShiftVarOrderFactory implements PcjVarOrderFactory { + @Override + public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) { + final Set<VariableOrder> varOrders = new HashSet<>(); + + final List<String> cyclicBuff = Lists.newArrayList( varOrder.getVariableOrders() ); + final String[] varOrderBuff = new String[ cyclicBuff.size() ]; + + for(int shift = 0; shift < cyclicBuff.size(); shift++) { + // Build a variable order. + for(int i = 0; i < cyclicBuff.size(); i++) { + varOrderBuff[i] = cyclicBuff.get(i); + } + varOrders.add( new VariableOrder(varOrderBuff) ); + + // Shift the order the variables will appear in the cyclic buffer. + cyclicBuff.add( cyclicBuff.remove(0) ); + } + + return varOrders; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java new file mode 100644 index 0000000..ef88d8c --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Iterator; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.openrdf.query.BindingSet; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + +/** + * An ordered list of {@link BindingSet} variable names. These are used to + * specify the order {@link Binding}s within the set are serialized to Accumulo. + * This order effects which rows a prefix scan will hit. + */ +@Immutable +@ParametersAreNonnullByDefault +public final class VariableOrder implements Iterable<String> { + + public static final String VAR_ORDER_DELIM = ";"; + + private final ImmutableList<String> variableOrder; + + /** + * Constructs an instance of {@link VariableOrder}. + * + * @param varOrder - An ordered array of Binding Set variables. (not null) + */ + public VariableOrder(final String... varOrder) { + checkNotNull(varOrder); + variableOrder = ImmutableList.copyOf(varOrder); + } + + /** + * Constructs an instance of {@link VariableOrdeR{. + * + * @param varOrder - An ordered collection of Binding Set variables. (not null) + */ + public VariableOrder(final Collection<String> varOrder) { + checkNotNull(varOrder); + variableOrder = ImmutableList.copyOf(varOrder); + } + + /** + * Constructs an instance of {@link VariableOrder}. + * + * @param varOrderString - The String representation of a VariableOrder. (not null) + */ + public VariableOrder(final String varOrderString) { + checkNotNull(varOrderString); + variableOrder = ImmutableList.copyOf( varOrderString.split(VAR_ORDER_DELIM) ); + } + + /** + * @return And ordered list of Binding Set variables. + */ + public ImmutableList<String> getVariableOrders() { + return variableOrder; + } + + /** + * @return The variable order as an ordered array of Strings. This array is mutable. + */ + public String[] toArray() { + final String[] array = new String[ variableOrder.size() ]; + return variableOrder.toArray( array ); + } + + @Override + public String toString() { + return Joiner.on(VAR_ORDER_DELIM).join(variableOrder); + } + + @Override + public int hashCode() { + return variableOrder.hashCode(); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } else if(o instanceof VariableOrder) { + final VariableOrder varOrder = (VariableOrder) o; + return variableOrder.equals( varOrder.variableOrder ); + } + return false; + } + + @Override + public Iterator<String> iterator() { + return variableOrder.iterator(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java new file mode 100644 index 0000000..b9f4a1f --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSet.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.BindingSet; + +/** + * Decorates a {@link BindingSet} with a collection of visibilities. + */ +@ParametersAreNonnullByDefault +public class VisibilityBindingSet extends BindingSetDecorator { + private static final long serialVersionUID = 1L; + private final String visibility; + private volatile int hashCode; + + /** + * @param set - Decorates the {@link BindingSet} with no visibilities. + */ + public VisibilityBindingSet(final BindingSet set) { + this(set, ""); + } + + /** + * Creates a new {@link VisibilityBindingSet} + * @param set - The {@link BindingSet} to decorate + * @param visibility - The visibilities on the {@link BindingSet} (not null) + */ + public VisibilityBindingSet(final BindingSet set, final String visibility) { + super(set); + this.visibility = checkNotNull(visibility); + } + + /** + * @return - The Visibilities on the {@link BindingSet} + */ + public String getVisibility() { + return visibility; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } else if(o instanceof VisibilityBindingSet) { + final VisibilityBindingSet other = (VisibilityBindingSet) o; + return set.equals(other) && visibility.equals(other.getVisibility()); + } + return false; + } + + @Override + public int hashCode() { + int result = hashCode; + if(result == 0) { + result = 31 * result + visibility.hashCode(); + result = 31 * result + super.hashCode(); + hashCode = result; + } + return result; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("\n Visibility: " + getVisibility() + "\n"); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java new file mode 100644 index 0000000..8ff01ac --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetStringConverter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.BindingSet; + +import com.google.common.base.Strings; + +/** + * Converts {@link BindingSet}s to Strings and back again. The Strings do not + * include the binding names and are ordered with a {@link VariableOrder}. + */ +@ParametersAreNonnullByDefault +public class VisibilityBindingSetStringConverter extends BindingSetStringConverter { + public static final char VISIBILITY_DELIM = 1; + + @Override + public String convert(final BindingSet bindingSet, final VariableOrder varOrder) { + String visibility = ""; + if(bindingSet instanceof VisibilityBindingSet) { + final VisibilityBindingSet visiSet = (VisibilityBindingSet) bindingSet; + if(!Strings.isNullOrEmpty(visiSet.getVisibility())) { + visibility = VISIBILITY_DELIM + visiSet.getVisibility(); + } + } + return super.convert(bindingSet, varOrder) + visibility; + } + + @Override + public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) { + final String[] visiStrings = bindingSetString.split("" + VISIBILITY_DELIM); + BindingSet bindingSet = super.convert(visiStrings[0], varOrder); + + if(visiStrings.length > 1) { + bindingSet = new VisibilityBindingSet(bindingSet, visiStrings[1]); + } else { + bindingSet = new VisibilityBindingSet(bindingSet); + } + return bindingSet; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java new file mode 100644 index 0000000..5cf01c5 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.update; + +import java.util.Collection; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.storage.PcjException; + +import mvm.rya.api.domain.RyaStatement; + +/** + * Updates the state of all PCJ indices whenever {@link RyaStatement}s are + * added to or removed from the system. + */ +@ParametersAreNonnullByDefault +public interface PrecomputedJoinUpdater { + + /** + * The PCJ indices will be updated to include new statements within + * their results. + * + * @param statements - The statements that will be used to updated the index. (not null) + * @throws PcjUpdateException The statements could not be added to the index. + */ + public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException; + + /** + * The PCJ indices will be updated to remove any results that are + * derived from the provided statements. + * </p> + * A result will only be deleted from the index if all statements + * it is derived from are removed. For example, suppose the following + * instructions execute: + * <pre> + * Insert Statement A + * Insert Statement B + * Insert Statement C + * A and B Join to create Result A + * B and C Join to create Result A again + * Delete Statement A + * </pre> + * Result A will remain in the index because B and C have not been + * delete. However, If either B or C are deleted, then the result will + * also be deleted because it can no longer be derived from the remaining + * information. + * + * @param statements - The statements that will be used to updated the index. (not null) + * @throws PcjUpdateException The statements could not be removed from the index. + */ + public void deleteStatements(Collection<RyaStatement> statements) throws PcjUpdateException; + + /** + * If the updater does any batching, then this will force it to flush immediately. + * + * @throws PcjUpdateException The updater could not be flushed. + */ + public void flush() throws PcjUpdateException; + + /** + * Cleans up any resources required to perform the updates (sockets, streams, etc). + * + * @throws PcjUpdateException The updater could not be closed. + */ + public void close() throws PcjUpdateException; + + /** + * An operation of {@link PrecomputedJoinUpdater} failed. + */ + public static class PcjUpdateException extends PcjException { + private static final long serialVersionUID = 1L; + + /** + * Constructs a new exception with the specified detail message. The cause + * is not initialized, and may subsequently be initialized by a call to + * {@link Throwable#initCause(java.lang.Throwable)}. + * + * @param message - The detail message. The detail message is saved for + * later retrieval by the {@link Throwable#getMessage()} method. + */ + public PcjUpdateException(final String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and cause. + * </p> + * Note that the detail message associated with cause is not automatically + * incorporated in this exception's detail message. + * + * @param message - The detail message (which is saved for later retrieval + * by the {@link Throwable#getMessage()} method). + * @param cause - The cause (which is saved for later retrieval by the + * {@link Throwable#getCause()} method). (A null value is permitted, and + * indicates that the cause is nonexistent or unknown.) + */ + public PcjUpdateException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file
