http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java deleted file mode 100644 index 5581e08..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java +++ /dev/null @@ -1,38 +0,0 @@ -package mvm.rya.accumulo.experimental; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MultiTableBatchWriter; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.persist.index.RyaSecondaryIndexer; - -public interface AccumuloIndexer extends RyaSecondaryIndexer { - public void init(); - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException; - public void setConnector(Connector connector); - public void destroy(); - public void purge(RdfCloudTripleStoreConfiguration configuration); - public void dropAndDestroy(); -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java deleted file mode 100644 index 6e818b3..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java +++ /dev/null @@ -1,229 +0,0 @@ -package mvm.rya.accumulo.instance; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import static java.util.Objects.requireNonNull; - -import java.util.Map.Entry; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ConditionalWriter; -import org.apache.accumulo.core.client.ConditionalWriter.Result; -import org.apache.accumulo.core.client.ConditionalWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Condition; -import org.apache.accumulo.core.data.ConditionalMutation; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; - -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetailsRepository; - -/** - * An implementation of {@link RyaDetailsRepository} that stores a Rya - * instance's {@link RyaDetails} in an Accumulo table. - * </p> - * XXX - * This implementation writes the details object as a serialized byte array to - * a row in Accumulo. Storing the entire structure within a single value is - * attractive because Accumulo's conditional writer will let us do checkAndSet - * style operations to synchronize writes to the object. On the downside, only - * Java clients will work. - */ -@ParametersAreNonnullByDefault -public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepository { - - public static final String INSTANCE_DETAILS_TABLE_NAME = "instance_details"; - - private static final Text ROW_ID = new Text("instance metadata"); - private static final Text COL_FAMILY = new Text("instance"); - private static final Text COL_QUALIFIER = new Text("details"); - - private final RyaDetailsSerializer serializer = new RyaDetailsSerializer(); - - private final Connector connector; - private final String instanceName; - private final String detailsTableName; - - - /** - * Constructs an instance of {@link AccumuloRyaInstanceDetailsRepository}. - * - * @param connector - Connects to the instance of Accumulo that hosts the Rya instance. (not null) - * @param instanceName - The name of the Rya instance this repository represents. (not null) - */ - public AccumuloRyaInstanceDetailsRepository(final Connector connector, final String instanceName) { - this.connector = requireNonNull( connector ); - this.instanceName = requireNonNull( instanceName ); - this.detailsTableName = instanceName + INSTANCE_DETAILS_TABLE_NAME; - } - - @Override - public boolean isInitialized() throws RyaDetailsRepositoryException { - try { - final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); - scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); - return scanner.iterator().hasNext(); - } catch (final TableNotFoundException e) { - return false; - } - } - - @Override - public void initialize(final RyaDetails details) throws AlreadyInitializedException, RyaDetailsRepositoryException { - // Preconditions. - requireNonNull( details ); - - if(!details.getRyaInstanceName().equals( instanceName )) { - throw new RyaDetailsRepositoryException("The instance name that was in the provided 'details' does not match " + - "the instance name that this repository is connected to. Make sure you're connected to the" + - "correct Rya instance."); - } - - if(isInitialized()) { - throw new AlreadyInitializedException("The repository has already been initialized for the Rya instance named '" + - instanceName + "'."); - } - - // Create the table that hosts the details if it has not been created yet. - final TableOperations tableOps = connector.tableOperations(); - if(!tableOps.exists(detailsTableName)) { - try { - tableOps.create(detailsTableName); - } catch (AccumuloException | AccumuloSecurityException | TableExistsException e) { - throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + - instanceName + "' because the the table that holds that information could not be created."); - } - } - - // Write the details to the table. - BatchWriter writer = null; - try { - writer = connector.createBatchWriter(detailsTableName, new BatchWriterConfig()); - - final byte[] bytes = serializer.serialize(details); - final Mutation mutation = new Mutation(ROW_ID); - mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(bytes)); - writer.addMutation( mutation ); - - } catch (final TableNotFoundException | MutationsRejectedException e) { - throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e); - } finally { - if(writer != null) { - try { - writer.close(); - } catch (final MutationsRejectedException e) { - throw new RyaDetailsRepositoryException("Could not initialize the Rya instance details for the instance named '" + instanceName + "'.", e); - } - } - } - } - - @Override - public RyaDetails getRyaInstanceDetails() throws NotInitializedException, RyaDetailsRepositoryException { - // Preconditions. - if(!isInitialized()) { - throw new NotInitializedException("Could not fetch the details for the Rya instanced named '" + - instanceName + "' because it has not been initialized yet."); - } - - // Read it from the table. - try { - // Fetch the value from the table. - final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); - scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); - final Entry<Key, Value> entry = scanner.iterator().next(); - - // Deserialize it. - final byte[] bytes = entry.getValue().get(); - return serializer.deserialize( bytes ); - - } catch (final TableNotFoundException e) { - throw new RyaDetailsRepositoryException("Could not get the details from the table.", e); - } - } - - @Override - public void update(final RyaDetails oldDetails, final RyaDetails newDetails) - throws NotInitializedException, ConcurrentUpdateException, RyaDetailsRepositoryException { - // Preconditions. - requireNonNull(oldDetails); - requireNonNull(newDetails); - - if(!newDetails.getRyaInstanceName().equals( instanceName )) { - throw new RyaDetailsRepositoryException("The instance name that was in the provided 'newDetails' does not match " + - "the instance name that this repository is connected to. Make sure you're connected to the" + - "correct Rya instance."); - } - - if(!isInitialized()) { - throw new NotInitializedException("Could not update the details for the Rya instanced named '" + - instanceName + "' because it has not been initialized yet."); - } - - // Use a conditional writer so that we can detect when the old details - // are no longer the currently stored ones. - ConditionalWriter writer = null; - try { - // Setup the condition that ensures the details have not changed since the edits were made. - final byte[] oldDetailsBytes = serializer.serialize(oldDetails); - final Condition condition = new Condition(COL_FAMILY, COL_QUALIFIER); - condition.setValue( oldDetailsBytes ); - - // Create the mutation that only performs the update if the details haven't changed. - final ConditionalMutation mutation = new ConditionalMutation(ROW_ID); - mutation.addCondition( condition ); - final byte[] newDetailsBytes = serializer.serialize(newDetails); - mutation.put(COL_FAMILY, COL_QUALIFIER, new Value(newDetailsBytes)); - - // Do the write. - writer = connector.createConditionalWriter(detailsTableName, new ConditionalWriterConfig()); - final Result result = writer.write(mutation); - switch(result.getStatus()) { - case REJECTED: - case VIOLATED: - throw new ConcurrentUpdateException("Could not update the details for the Rya instance named '" + - instanceName + "' because the old value is out of date."); - case UNKNOWN: - case INVISIBLE_VISIBILITY: - throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'."); - } - } catch (final TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new RyaDetailsRepositoryException("Could not update the details for the Rya instance named '" + instanceName + "'."); - } finally { - if(writer != null) { - writer.close(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java deleted file mode 100644 index 8c863ea..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/instance/RyaDetailsSerializer.java +++ /dev/null @@ -1,96 +0,0 @@ -package mvm.rya.accumulo.instance; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import javax.annotation.ParametersAreNonnullByDefault; - -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; - -/** - * Serializes {@link RyaDetails} instances. - */ -@ParametersAreNonnullByDefault -public class RyaDetailsSerializer { - - /** - * Serializes an instance of {@link RyaDetails}. - * - * @param details - The details that will be serialized. (not null) - * @return The serialized details. - */ - public byte[] serialize(final RyaDetails details) throws SerializationException { - requireNonNull(details); - - try { - final ByteArrayOutputStream stream = new ByteArrayOutputStream(); - new ObjectOutputStream(stream).writeObject( details ); - return stream.toByteArray(); - } catch (final IOException e) { - throw new SerializationException("Could not serialize an instance of RyaDetails.", e); - } - } - - /** - * Deserializes an instance of {@link RyaDetails}. - * - * @param bytes - The serialized for of a {@link RyaDetails}. (not null) - * @return The deserialized object. - */ - public RyaDetails deserialize(final byte[] bytes) throws SerializationException { - requireNonNull(bytes); - - try { - final ByteArrayInputStream stream = new ByteArrayInputStream( bytes ); - final Object o = new ObjectInputStream( stream ).readObject(); - - if(! (o instanceof RyaDetails) ) { - throw new SerializationException("Wrong type of object was deserialized. Class: " + o.getClass().getName() ); - } - - return (RyaDetails) o; - } catch (final ClassNotFoundException | IOException e) { - throw new SerializationException("Could not deserialize an instance of RyaDetails.", e); - } - } - - /** - * Could not serialize an instance of {@link RyaDetails}. - */ - public static class SerializationException extends RyaDetailsRepositoryException { - private static final long serialVersionUID = 1L; - - public SerializationException(final String message) { - super(message); - } - - public SerializationException(final String message, final Throwable cause) { - super(message, cause); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java deleted file mode 100644 index ba3ffd2..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java +++ /dev/null @@ -1,410 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable; -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaRange; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.query.BatchRyaQuery; -import mvm.rya.api.persist.query.RyaQuery; -import mvm.rya.api.persist.query.RyaQueryEngine; -import mvm.rya.api.query.strategy.ByteRange; -import mvm.rya.api.query.strategy.TriplePatternStrategy; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRowRegex; -import mvm.rya.api.utils.CloseableIterableIteration; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.RegExFilter; -import org.apache.accumulo.core.iterators.user.TimestampFilter; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.calrissian.mango.collect.CloseableIterable; -import org.calrissian.mango.collect.CloseableIterables; -import org.calrissian.mango.collect.FluentCloseableIterable; -import org.openrdf.query.BindingSet; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterators; - -/** - * Date: 7/17/12 - * Time: 9:28 AM - */ -public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfiguration> { - - private AccumuloRdfConfiguration configuration; - private Connector connector; - private RyaTripleContext ryaContext; - private final Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, KeyValueToRyaStatementFunction>(); - - public AccumuloRyaQueryEngine(Connector connector) { - this(connector, new AccumuloRdfConfiguration()); - } - - public AccumuloRyaQueryEngine(Connector connector, AccumuloRdfConfiguration conf) { - this.connector = connector; - this.configuration = conf; - ryaContext = RyaTripleContext.getInstance(conf); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO, ryaContext)); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO, ryaContext)); - keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP, ryaContext)); - } - - @Override - public CloseableIteration<RyaStatement, RyaDAOException> query(RyaStatement stmt, AccumuloRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - - RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build(); - CloseableIterable<RyaStatement> results = query(ryaQuery); - - return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); - } - - protected String getData(RyaType ryaType) { - return (ryaType != null) ? (ryaType.getData()) : (null); - } - - @Override - public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, BindingSet>> stmts, AccumuloRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - //query configuration - Authorizations authorizations = conf.getAuthorizations(); - Long ttl = conf.getTtl(); - Long maxResults = conf.getLimit(); - Integer maxRanges = conf.getMaxRangesForScanner(); - Integer numThreads = conf.getNumThreads(); - - //TODO: cannot span multiple tables here - try { - Collection<Range> ranges = new HashSet<Range>(); - RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); - TABLE_LAYOUT layout = null; - RyaURI context = null; - TriplePatternStrategy strategy = null; - for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { - RyaStatement stmt = stmtbs.getKey(); - context = stmt.getContext(); //TODO: This will be overwritten - BindingSet bs = stmtbs.getValue(); - strategy = ryaContext.retrieveStrategy(stmt); - if (strategy == null) { - throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); - } - - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf); - - //use range to set scanner - //populate scanner based on authorizations, ttl - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - ranges.add(range); - rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs)); - } - //no ranges - if (layout == null) return null; - String regexSubject = conf.getRegexSubject(); - String regexPredicate = conf.getRegexPredicate(); - String regexObject = conf.getRegexObject(); - TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); - - String table = layoutToTable(layout, conf); - boolean useBatchScanner = ranges.size() > maxRanges; - RyaStatementBindingSetKeyValueIterator iterator = null; - if (useBatchScanner) { - ScannerBase scanner = connector.createBatchScanner(table, authorizations, numThreads); - ((BatchScanner) scanner).setRanges(ranges); - fillScanner(scanner, context, null, ttl, null, tripleRowRegex, conf); - iterator = new RyaStatementBindingSetKeyValueIterator(layout, ryaContext, scanner, rangeMap); - } else { - Scanner scannerBase = null; - Iterator<Map.Entry<Key, Value>>[] iters = new Iterator[ranges.size()]; - int i = 0; - for (Range range : ranges) { - scannerBase = connector.createScanner(table, authorizations); - scannerBase.setRange(range); - fillScanner(scannerBase, context, null, ttl, null, tripleRowRegex, conf); - iters[i] = scannerBase.iterator(); - i++; - } - iterator = new RyaStatementBindingSetKeyValueIterator(layout, Iterators.concat(iters), rangeMap, ryaContext); - } - if (maxResults != null) { - iterator.setMaxResults(maxResults); - } - return iterator; - } catch (Exception e) { - throw new RyaDAOException(e); - } - - } - - @Override - public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(Collection<RyaStatement> stmts, AccumuloRdfConfiguration conf) - throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - - BatchRyaQuery batchRyaQuery = BatchRyaQuery.builder(stmts).load(conf).build(); - CloseableIterable<RyaStatement> results = query(batchRyaQuery); - - return new CloseableIterableIteration<RyaStatement, RyaDAOException>(results); - } - - @Override - public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws RyaDAOException { - Preconditions.checkNotNull(ryaQuery); - RyaStatement stmt = ryaQuery.getQuery(); - Preconditions.checkNotNull(stmt); - - //query configuration - String[] auths = ryaQuery.getAuths(); - Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); - Long ttl = ryaQuery.getTtl(); - Long currentTime = ryaQuery.getCurrentTime(); - Long maxResults = ryaQuery.getMaxResults(); - Integer batchSize = ryaQuery.getBatchSize(); - String regexSubject = ryaQuery.getRegexSubject(); - String regexPredicate = ryaQuery.getRegexPredicate(); - String regexObject = ryaQuery.getRegexObject(); - TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); - - try { - //find triple pattern range - TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt); - TABLE_LAYOUT layout; - Range range; - RyaURI subject = stmt.getSubject(); - RyaURI predicate = stmt.getPredicate(); - RyaType object = stmt.getObject(); - RyaURI context = stmt.getContext(); - String qualifier = stmt.getQualifer(); - TripleRowRegex tripleRowRegex = null; - if (strategy != null) { - //otherwise, full table scan is supported - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(subject, predicate, object, context, null); - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - - } else { - range = new Range(); - layout = TABLE_LAYOUT.SPO; - strategy = ryaContext.retrieveStrategy(layout); - } - - byte[] objectTypeInfo = null; - if (object != null) { - //TODO: Not good to serialize this twice - if (object instanceof RyaRange) { - objectTypeInfo = RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1]; - } else { - objectTypeInfo = RyaContext.getInstance().serializeType(object)[1]; - } - } - - tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, objectTypeInfo); - - //use range to set scanner - //populate scanner based on authorizations, ttl - String table = layoutToTable(layout, tableLayoutStrategy); - Scanner scanner = connector.createScanner(table, authorizations); - scanner.setRange(range); - if (batchSize != null) { - scanner.setBatchSize(batchSize); - } - fillScanner(scanner, context, qualifier, ttl, currentTime, tripleRowRegex, ryaQuery.getConf()); - - FluentCloseableIterable<RyaStatement> results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)) - .transform(keyValueToRyaStatementFunctionMap.get(layout)); - if (maxResults != null) { - results = results.limit(maxResults.intValue()); - } - - return results; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) throws RyaDAOException { - Preconditions.checkNotNull(ryaQuery); - Iterable<RyaStatement> stmts = ryaQuery.getQueries(); - Preconditions.checkNotNull(stmts); - - //query configuration - String[] auths = ryaQuery.getAuths(); - final Authorizations authorizations = auths != null ? new Authorizations(auths) : configuration.getAuthorizations(); - final Long ttl = ryaQuery.getTtl(); - Long currentTime = ryaQuery.getCurrentTime(); - Long maxResults = ryaQuery.getMaxResults(); - Integer batchSize = ryaQuery.getBatchSize(); - Integer numQueryThreads = ryaQuery.getNumQueryThreads(); - String regexSubject = ryaQuery.getRegexSubject(); - String regexPredicate = ryaQuery.getRegexPredicate(); - String regexObject = ryaQuery.getRegexObject(); - TableLayoutStrategy tableLayoutStrategy = configuration.getTableLayoutStrategy(); - int maxRanges = ryaQuery.getMaxRanges(); - - //TODO: cannot span multiple tables here - try { - Collection<Range> ranges = new HashSet<Range>(); - TABLE_LAYOUT layout = null; - RyaURI context = null; - TriplePatternStrategy strategy = null; - for (RyaStatement stmt : stmts) { - context = stmt.getContext(); //TODO: This will be overwritten - strategy = ryaContext.retrieveStrategy(stmt); - if (strategy == null) { - throw new IllegalArgumentException("TriplePattern[" + stmt + "] not supported"); - } - - Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, ByteRange> entry = - strategy.defineRange(stmt.getSubject(), stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null); - - //use range to set scanner - //populate scanner based on authorizations, ttl - layout = entry.getKey(); - ByteRange byteRange = entry.getValue(); - Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); - ranges.add(range); - } - //no ranges - if (layout == null) throw new IllegalArgumentException("No table layout specified"); - - final TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null); - - final String table = layoutToTable(layout, tableLayoutStrategy); - boolean useBatchScanner = ranges.size() > maxRanges; - FluentCloseableIterable<RyaStatement> results = null; - if (useBatchScanner) { - BatchScanner scanner = connector.createBatchScanner(table, authorizations, numQueryThreads); - scanner.setRanges(ranges); - fillScanner(scanner, context, null, ttl, null, tripleRowRegex, ryaQuery.getConf()); - results = FluentCloseableIterable.from(new ScannerBaseCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout)); - } else { - final RyaURI fcontext = context; - final RdfCloudTripleStoreConfiguration fconf = ryaQuery.getConf(); - FluentIterable<RyaStatement> fluent = FluentIterable.from(ranges).transformAndConcat(new Function<Range, Iterable<Map.Entry<Key, Value>>>() { - @Override - public Iterable<Map.Entry<Key, Value>> apply(Range range) { - try { - Scanner scanner = connector.createScanner(table, authorizations); - scanner.setRange(range); - fillScanner(scanner, fcontext, null, ttl, null, tripleRowRegex, fconf); - return scanner; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }).transform(keyValueToRyaStatementFunctionMap.get(layout)); - - results = FluentCloseableIterable.from(CloseableIterables.wrap(fluent)); - } - if (maxResults != null) { - results = results.limit(maxResults.intValue()); - } - return results; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - protected void fillScanner(ScannerBase scanner, RyaURI context, String qualifier, Long ttl, Long currentTime, TripleRowRegex tripleRowRegex, RdfCloudTripleStoreConfiguration conf) throws IOException { - if (context != null && qualifier != null) { - scanner.fetchColumn(new Text(context.getData()), new Text(qualifier)); - } else if (context != null) { - scanner.fetchColumnFamily(new Text(context.getData())); - } else if (qualifier != null) { - IteratorSetting setting = new IteratorSetting(8, "riq", RegExFilter.class.getName()); - RegExFilter.setRegexs(setting, null, null, qualifier, null, false); - scanner.addScanIterator(setting); - } - if (ttl != null) { - IteratorSetting setting = new IteratorSetting(9, "fi", TimestampFilter.class.getName()); - TimestampFilter.setStart(setting, System.currentTimeMillis() - ttl, true); - if(currentTime != null){ - TimestampFilter.setStart(setting, currentTime - ttl, true); - TimestampFilter.setEnd(setting, currentTime, true); - } - scanner.addScanIterator(setting); - } - if (tripleRowRegex != null) { - IteratorSetting setting = new IteratorSetting(11, "ri", RegExFilter.class.getName()); - String regex = tripleRowRegex.getRow(); - RegExFilter.setRegexs(setting, regex, null, null, null, false); - scanner.addScanIterator(setting); - } - if (conf instanceof AccumuloRdfConfiguration) { - //TODO should we take the iterator settings as is or should we adjust the priority based on the above? - for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) { - scanner.addScanIterator(itr); - } - } - } - - @Override - public void setConf(AccumuloRdfConfiguration conf) { - this.configuration = conf; - } - - @Override - public AccumuloRdfConfiguration getConf() { - return configuration; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java deleted file mode 100644 index 2813438..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/KeyValueToRyaStatementFunction.java +++ /dev/null @@ -1,72 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; - -import com.google.common.base.Function; - -/** - * Date: 1/30/13 - * Time: 2:09 PM - */ -public class KeyValueToRyaStatementFunction implements Function<Map.Entry<Key, Value>, RyaStatement> { - - private TABLE_LAYOUT tableLayout; - private RyaTripleContext context; - - public KeyValueToRyaStatementFunction(TABLE_LAYOUT tableLayout, RyaTripleContext context) { - this.tableLayout = tableLayout; - this.context = context; - } - - @Override - public RyaStatement apply(Map.Entry<Key, Value> input) { - Key key = input.getKey(); - Value value = input.getValue(); - RyaStatement statement = null; - try { - statement = context.deserializeTriple(tableLayout, - new TripleRow(key.getRowData().toArray(), - key.getColumnFamilyData().toArray(), - key.getColumnQualifierData().toArray(), - key.getTimestamp(), - key.getColumnVisibilityData().toArray(), - (value != null) ? value.get() : null - )); - } catch (TripleRowResolverException e) { - throw new RuntimeException(e); - } - - return statement; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java deleted file mode 100644 index c59cb87..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java +++ /dev/null @@ -1,58 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.openrdf.query.BindingSet; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -/** - * Class RangeBindingSetCollection - * Date: Feb 23, 2011 - * Time: 10:15:48 AM - */ -public class RangeBindingSetEntries { - public Collection<Map.Entry<Range, BindingSet>> ranges; - - public RangeBindingSetEntries() { - this(new ArrayList<Map.Entry<Range, BindingSet>>()); - } - - public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> ranges) { - this.ranges = ranges; - } - - public Collection<BindingSet> containsKey(Key key) { - //TODO: need to find a better way to sort these and pull - //TODO: maybe fork/join here - Collection<BindingSet> bss = new ArrayList<BindingSet>(); - for (Map.Entry<Range, BindingSet> entry : ranges) { - if (entry.getKey().contains(key)) - bss.add(entry.getValue()); - } - return bss; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java deleted file mode 100644 index b4333bd..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java +++ /dev/null @@ -1,154 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; - -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.openrdf.query.BindingSet; - -/** - * Date: 7/17/12 - * Time: 11:48 AM - */ -public class RyaStatementBindingSetKeyValueIterator implements CloseableIteration<Map.Entry<RyaStatement, BindingSet>, RyaDAOException> { - private Iterator<Map.Entry<Key, Value>> dataIterator; - private TABLE_LAYOUT tableLayout; - private Long maxResults = -1L; - private ScannerBase scanner; - private boolean isBatchScanner; - private RangeBindingSetEntries rangeMap; - private Iterator<BindingSet> bsIter; - private RyaStatement statement; - private RyaTripleContext ryaContext; - - public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, ScannerBase scannerBase, RangeBindingSetEntries rangeMap) { - this(tableLayout, ((scannerBase instanceof BatchScanner) ? ((BatchScanner) scannerBase).iterator() : ((Scanner) scannerBase).iterator()), rangeMap, context); - this.scanner = scannerBase; - isBatchScanner = scanner instanceof BatchScanner; - } - - public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, Iterator<Map.Entry<Key, Value>> dataIterator, RangeBindingSetEntries rangeMap, RyaTripleContext ryaContext) { - this.tableLayout = tableLayout; - this.rangeMap = rangeMap; - this.dataIterator = dataIterator; - this.ryaContext = ryaContext; - } - - @Override - public void close() throws RyaDAOException { - dataIterator = null; - if (scanner != null && isBatchScanner) { - ((BatchScanner) scanner).close(); - } - } - - public boolean isClosed() throws RyaDAOException { - return dataIterator == null; - } - - @Override - public boolean hasNext() throws RyaDAOException { - if (isClosed()) { - return false; - } - if (maxResults != 0) { - if (bsIter != null && bsIter.hasNext()) { - return true; - } - if (dataIterator.hasNext()) { - return true; - } else { - maxResults = 0l; - return false; - } - } - return false; - } - - @Override - public Map.Entry<RyaStatement, BindingSet> next() throws RyaDAOException { - if (!hasNext() || isClosed()) { - throw new NoSuchElementException(); - } - - try { - while (true) { - if (bsIter != null && bsIter.hasNext()) { - maxResults--; - return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, bsIter.next()); - } - - if (dataIterator.hasNext()) { - Map.Entry<Key, Value> next = dataIterator.next(); - Key key = next.getKey(); - statement = ryaContext.deserializeTriple(tableLayout, - new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), - key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); - if (next.getValue() != null) { - statement.setValue(next.getValue().get()); - } - Collection<BindingSet> bindingSets = rangeMap.containsKey(key); - if (!bindingSets.isEmpty()) { - bsIter = bindingSets.iterator(); - } - } else { - break; - } - } - return null; - } catch (TripleRowResolverException e) { - throw new RyaDAOException(e); - } - } - - @Override - public void remove() throws RyaDAOException { - next(); - } - - public Long getMaxResults() { - return maxResults; - } - - public void setMaxResults(Long maxResults) { - this.maxResults = maxResults; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java deleted file mode 100644 index f4c3081..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java +++ /dev/null @@ -1,107 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Iterator; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; - -/** - * Date: 7/17/12 - * Time: 11:48 AM - */ -public class RyaStatementKeyValueIterator implements CloseableIteration<RyaStatement, RyaDAOException> { - private Iterator<Map.Entry<Key, Value>> dataIterator; - private TABLE_LAYOUT tableLayout; - private Long maxResults = -1L; - private RyaTripleContext context; - - public RyaStatementKeyValueIterator(TABLE_LAYOUT tableLayout, RyaTripleContext context, Iterator<Map.Entry<Key, Value>> dataIterator) { - this.tableLayout = tableLayout; - this.dataIterator = dataIterator; - this.context = context; - } - - @Override - public void close() throws RyaDAOException { - dataIterator = null; - } - - public boolean isClosed() throws RyaDAOException { - return dataIterator == null; - } - - @Override - public boolean hasNext() throws RyaDAOException { - if (isClosed()) { - throw new RyaDAOException("Closed Iterator"); - } - return maxResults != 0 && dataIterator.hasNext(); - } - - @Override - public RyaStatement next() throws RyaDAOException { - if (!hasNext()) { - return null; - } - - try { - Map.Entry<Key, Value> next = dataIterator.next(); - Key key = next.getKey(); - RyaStatement statement = context.deserializeTriple(tableLayout, - new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), - key.getTimestamp(), key.getColumnVisibilityData().toArray(), next.getValue().get())); - if (next.getValue() != null) { - statement.setValue(next.getValue().get()); - } - maxResults--; - return statement; - } catch (TripleRowResolverException e) { - throw new RyaDAOException(e); - } - } - - @Override - public void remove() throws RyaDAOException { - next(); - } - - public Long getMaxResults() { - return maxResults; - } - - public void setMaxResults(Long maxResults) { - this.maxResults = maxResults; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java deleted file mode 100644 index d2dcef9..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java +++ /dev/null @@ -1,56 +0,0 @@ -package mvm.rya.accumulo.query; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import com.google.common.base.Preconditions; -import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.calrissian.mango.collect.AbstractCloseableIterable; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -/** - * Date: 1/30/13 - * Time: 2:15 PM - */ -public class ScannerBaseCloseableIterable extends AbstractCloseableIterable<Map.Entry<Key, Value>> { - - protected ScannerBase scanner; - - public ScannerBaseCloseableIterable(ScannerBase scanner) { - Preconditions.checkNotNull(scanner); - this.scanner = scanner; - } - - @Override - protected void doClose() throws IOException { - scanner.close(); - } - - @Override - protected Iterator<Map.Entry<Key, Value>> retrieveIterator() { - return scanner.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java deleted file mode 100644 index 97d2f54..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java +++ /dev/null @@ -1,87 +0,0 @@ -package mvm.rya.accumulo.utils; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Filter; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.OptionDescriber; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; - -import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; - -/** - * Set the startTime and timeRange. The filter will only keyValues that - * are within the range [startTime - timeRange, startTime]. - */ -public class TimeRangeFilter extends Filter { - private long timeRange; - private long startTime; - public static final String TIME_RANGE_PROP = "timeRange"; - public static final String START_TIME_PROP = "startTime"; - - @Override - public boolean accept(Key k, Value v) { - long diff = startTime - k.getTimestamp(); - return !(diff > timeRange || diff < 0); - } - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - if (options == null) { - throw new IllegalArgumentException("options must be set for TimeRangeFilter"); - } - - timeRange = -1; - String timeRange_s = options.get(TIME_RANGE_PROP); - if (timeRange_s == null) - throw new IllegalArgumentException("timeRange must be set for TimeRangeFilter"); - - timeRange = Long.parseLong(timeRange_s); - - String time = options.get(START_TIME_PROP); - if (time != null) - startTime = Long.parseLong(time); - else - startTime = System.currentTimeMillis(); - } - - @Override - public OptionDescriber.IteratorOptions describeOptions() { - Map<String, String> options = new TreeMap<String, String>(); - options.put(TIME_RANGE_PROP, "time range from the startTime (milliseconds)"); - options.put(START_TIME_PROP, "if set, use the given value as the absolute time in milliseconds as the start time in the time range."); - return new OptionDescriber.IteratorOptions("timeRangeFilter", "TimeRangeFilter removes entries with timestamps outside of the given time range: " + - "[startTime - timeRange, startTime]", - options, null); - } - - @Override - public boolean validateOptions(Map<String, String> options) { - Long.parseLong(options.get(TIME_RANGE_PROP)); - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java deleted file mode 100644 index cc4edca..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/VisibilitySimplifier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.accumulo.utils; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.security.ColumnVisibility; - -import com.google.common.base.Charsets; - -/** - * Simplifies Accumulo visibility expressions. - */ -@ParametersAreNonnullByDefault -public class VisibilitySimplifier { - - /** - * Simplifies an Accumulo visibility expression. - * - * @param visibility - The expression to simplify. (not null) - * @return A simplified form of {@code visibility}. - */ - public String simplify(final String visibility) { - requireNonNull(visibility); - - String last = visibility; - String simplified = new String(new ColumnVisibility(visibility).flatten(), Charsets.UTF_8); - - while(!simplified.equals(last)) { - last = simplified; - simplified = new String(new ColumnVisibility(simplified).flatten(), Charsets.UTF_8); - } - - return simplified; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java new file mode 100644 index 0000000..ebca6a2 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloNamespaceTableIterator.java @@ -0,0 +1,99 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import com.google.common.base.Preconditions; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.persist.RdfDAOException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.openrdf.model.Namespace; +import org.openrdf.model.impl.NamespaceImpl; + +import java.io.IOError; +import java.util.Iterator; +import java.util.Map.Entry; + +public class AccumuloNamespaceTableIterator<T extends Namespace> implements + CloseableIteration<Namespace, RdfDAOException> { + + private boolean open = false; + private Iterator<Entry<Key, Value>> result; + + public AccumuloNamespaceTableIterator(Iterator<Entry<Key, Value>> result) throws RdfDAOException { + Preconditions.checkNotNull(result); + open = true; + this.result = result; + } + + @Override + public void close() throws RdfDAOException { + try { + verifyIsOpen(); + open = false; + } catch (IOError e) { + throw new RdfDAOException(e); + } + } + + public void verifyIsOpen() throws RdfDAOException { + if (!open) { + throw new RdfDAOException("Iterator not open"); + } + } + + @Override + public boolean hasNext() throws RdfDAOException { + verifyIsOpen(); + return result != null && result.hasNext(); + } + + @Override + public Namespace next() throws RdfDAOException { + if (hasNext()) { + return getNamespace(result); + } + return null; + } + + public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) { + for (; rowResults.hasNext(); ) { + Entry<Key, Value> next = rowResults.next(); + Key key = next.getKey(); + Value val = next.getValue(); + String cf = key.getColumnFamily().toString(); + String cq = key.getColumnQualifier().toString(); + return new NamespaceImpl(key.getRow().toString(), new String( + val.get())); + } + return null; + } + + @Override + public void remove() throws RdfDAOException { + next(); + } + + public boolean isOpen() { + return open; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java new file mode 100644 index 0000000..709ceb9 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java @@ -0,0 +1,158 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +/** + * Created by IntelliJ IDEA. + * Date: 4/25/12 + * Time: 3:24 PM + * To change this template use File | Settings | File Templates. + */ +public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { + + public static final String MAXRANGES_SCANNER = "ac.query.maxranges"; + + public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; + + public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush"; + + public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size"; + public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d."; + public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name"; + public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass"; + public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority"; + public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize"; + public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name"; + public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value"; + + public AccumuloRdfConfiguration() { + super(); + } + + public AccumuloRdfConfiguration(Configuration other) { + super(other); + } + + @Override + public AccumuloRdfConfiguration clone() { + return new AccumuloRdfConfiguration(this); + } + + public Authorizations getAuthorizations() { + String[] auths = getAuths(); + if (auths == null || auths.length == 0) + return AccumuloRdfConstants.ALL_AUTHORIZATIONS; + return new Authorizations(auths); + } + + public void setMaxRangesForScanner(Integer max) { + setInt(MAXRANGES_SCANNER, max); + } + + public Integer getMaxRangesForScanner() { + return getInt(MAXRANGES_SCANNER, 2); + } + + public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) { + List<String> strs = Lists.newArrayList(); + for (Class<? extends AccumuloIndexer> ai : indexers){ + strs.add(ai.getName()); + } + + setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{})); + } + + public List<AccumuloIndexer> getAdditionalIndexers() { + return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class); + } + public boolean flushEachUpdate(){ + return getBoolean(CONF_FLUSH_EACH_UPDATE, true); + } + + public void setFlush(boolean flush){ + setBoolean(CONF_FLUSH_EACH_UPDATE, flush); + } + + public void setAdditionalIterators(IteratorSetting... additionalIterators){ + //TODO do we need to worry about cleaning up + this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length)); + int i = 0; + for(IteratorSetting iterator : additionalIterators) { + this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName()); + this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass()); + this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority())); + Map<String, String> options = iterator.getOptions(); + + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size())); + Iterator<Entry<String, String>> it = options.entrySet().iterator(); + int j = 0; + while(it.hasNext()) { + Entry<String, String> item = it.next(); + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey()); + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue()); + j++; + } + i++; + } + } + + public IteratorSetting[] getAdditionalIterators(){ + int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); + if(size == 0) { + return new IteratorSetting[0]; + } + + IteratorSetting[] settings = new IteratorSetting[size]; + for(int i = 0; i < size; i++) { + String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); + String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); + int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); + + int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); + Map<String, String> options = new HashMap<String, String>(optionsSize); + for(int j = 0; j < optionsSize; j++) { + String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); + String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); + options.put(key, value); + } + settings[i] = new IteratorSetting(priority, name, iteratorClass, options); + } + + return settings; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java new file mode 100644 index 0000000..1ec57a7 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConstants.java @@ -0,0 +1,40 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; + +/** + * Interface AccumuloRdfConstants + * Date: Mar 1, 2012 + * Time: 7:24:52 PM + */ +public interface AccumuloRdfConstants { + public static final Authorizations ALL_AUTHORIZATIONS = Constants.NO_AUTHS; + + public static final Value EMPTY_VALUE = new Value(new byte[0]); + + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java new file mode 100644 index 0000000..a3e0677 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfEvalStatsDAO.java @@ -0,0 +1,173 @@ +package mvm.rya.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreStatement; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RdfDAOException; +import mvm.rya.api.persist.RdfEvalStatsDAO; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Resource; +import org.openrdf.model.Value; + +/** + * Class CloudbaseRdfEvalStatsDAO + * Date: Feb 28, 2012 + * Time: 5:03:16 PM + */ +public class AccumuloRdfEvalStatsDAO implements RdfEvalStatsDAO<AccumuloRdfConfiguration> { + + private boolean initialized = false; + private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); + private Connector connector; + + // private String evalTable = TBL_EVAL; + private TableLayoutStrategy tableLayoutStrategy; + + @Override + public void init() throws RdfDAOException { + try { + if (isInitialized()) { + throw new IllegalStateException("Already initialized"); + } + checkNotNull(connector); + tableLayoutStrategy = conf.getTableLayoutStrategy(); +// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); +// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); + + TableOperations tos = connector.tableOperations(); + AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); +// boolean tableExists = tos.exists(evalTable); +// if (!tableExists) +// tos.create(evalTable); + initialized = true; + } catch (Exception e) { + throw new RdfDAOException(e); + } + } + + + @Override + public void destroy() throws RdfDAOException { + if (!isInitialized()) { + throw new IllegalStateException("Not initialized"); + } + initialized = false; + } + + @Override + public boolean isInitialized() throws RdfDAOException { + return initialized; + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + public AccumuloRdfConfiguration getConf() { + return conf; + } + + public void setConf(AccumuloRdfConfiguration conf) { + this.conf = conf; + } + + @Override + public double getCardinality(AccumuloRdfConfiguration conf, + mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, + List<Value> val, Resource context) throws RdfDAOException { + try { + Authorizations authorizations = conf.getAuthorizations(); + Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); + Text cfTxt = null; + if (CARDINALITY_OF.SUBJECT.equals(card)) { + cfTxt = SUBJECT_CF_TXT; + } else if (CARDINALITY_OF.PREDICATE.equals(card)) { + cfTxt = PRED_CF_TXT; + } else if (CARDINALITY_OF.OBJECT.equals(card)) { +// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality + return Double.MAX_VALUE; + } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) { + cfTxt = SUBJECTOBJECT_CF_TXT; + } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) { + cfTxt = SUBJECTPRED_CF_TXT; + } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) { + cfTxt = PREDOBJECT_CF_TXT; + } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); + Text cq = EMPTY_TEXT; + if (context != null) { + cq = new Text(context.stringValue().getBytes()); + } + scanner.fetchColumn(cfTxt, cq); + Iterator<Value> vals = val.iterator(); + String compositeIndex = vals.next().stringValue(); + while (vals.hasNext()){ + compositeIndex += DELIM + vals.next().stringValue(); + } + scanner.setRange(new Range(new Text(compositeIndex.getBytes()))); + Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter = scanner.iterator(); + if (iter.hasNext()) { + return Double.parseDouble(new String(iter.next().getValue().get())); + } + } catch (Exception e) { + throw new RdfDAOException(e); + } + + //default + return -1; + } + + @Override + public double getCardinality(AccumuloRdfConfiguration conf, + mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, + List<Value> val) throws RdfDAOException { + return getCardinality(conf, card, val, null); + } +}
