http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java deleted file mode 100644 index 0c7369c..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java +++ /dev/null @@ -1,267 +0,0 @@ -package mvm.rya.accumulo.pcj.iterators; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.model.Value; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.HashBiMap; - -/** - * This class takes in a {@link Scanner} and a Collection of BindingSets, - * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into - * a {@link BindingSet}, and performs a cross product on the BindingSet with - * each BindingSet in the provided Collection. The user can also specify a - * {@link Map<String, Value>} of constant constraints that can be used to filter. - * - */ -public class PCJKeyToCrossProductBindingSetIterator implements - CloseableIteration<BindingSet, QueryEvaluationException> { - - //BindingSets passed to PCJ used to form cross product - private List<BindingSet> crossProductBs; - //Scanner over PCJ table - private Scanner scanner; - //Iterator over PCJ scanner - private Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iterator; - //Map of PCJ variables in table to variable in query - private Map<String, String> pcjVarMap; - //if PCJ contains LeftJoin, this is a set of variables that only appear in - //LeftJoin. Used when performing the cross product. - private Set<String> unAssuredVariables; - private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); - private Iterator<BindingSet> crossProductIter = Collections.emptyIterator(); - private Map<String, Value> constantConstraints; - private BindingSet next; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private boolean crossProductBsExist = false; - private boolean constantConstraintsExist = false; - - public PCJKeyToCrossProductBindingSetIterator(Scanner scanner, - List<BindingSet> crossProductBs, - Map<String, Value> constantConstraints, Set<String> unAssuredVariables, - Map<String, String> pcjVarMap) { - this.crossProductBs = crossProductBs; - this.scanner = scanner; - this.iterator = scanner.iterator(); - this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); - this.constantConstraints = constantConstraints; - this.crossProductBsExist = crossProductBs.size() > 0; - this.constantConstraintsExist = constantConstraints.size() > 0; - this.unAssuredVariables = unAssuredVariables; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!hasNextCalled && !isEmpty) { - if (crossProductBsExist) { - while (crossProductIter.hasNext() || iterator.hasNext()) { - if (!crossProductIter.hasNext()) { - Key key = iterator.next().getKey(); - try { - crossProductIter = getCrossProducts(getBindingSet(key)); - } catch (BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - } - if (!crossProductIter.hasNext()) { - continue; - } - next = crossProductIter.next(); - hasNextCalled = true; - return true; - } - } else { - while (iterator.hasNext()) { - Key key = iterator.next().getKey(); - try { - next = getBindingSet(key); - } catch (BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - //BindingSet cannot be deserialized or is filtered - //out by constant constraints - if (next == null || next == EMPTY_BINDINGSET) { - continue; - } - hasNextCalled = true; - return true; - } - } - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public BindingSet next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return next; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - scanner.close(); - } - - /** - * - * @param key - * - Accumulo key obtained from scan - * @return - BindingSet satisfying any specified constant constraints - * @throws BindingSetConversionException - * @throws QueryEvaluationException - */ - private BindingSet getBindingSet(Key key) - throws BindingSetConversionException, QueryEvaluationException { - byte[] row = key.getRow().getBytes(); - String[] varOrder = key.getColumnFamily().toString() - .split(ExternalTupleSet.VAR_ORDER_DELIM); - - BindingSet bindingSet = converter.convert(row, new VariableOrder( - varOrder)); - - QueryBindingSet bs = new QueryBindingSet(); - for (String var : bindingSet.getBindingNames()) { - String mappedVar = null; - if(pcjVarMap.containsKey(var)) { - mappedVar = pcjVarMap.get(var); - } else { - throw new QueryEvaluationException("PCJ Variable has no mapping to query variable."); - } - if (constantConstraintsExist) { - if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) - && constantConstraints.containsKey(mappedVar) - && !constantConstraints.get(mappedVar).equals( - bindingSet.getValue(var))) { - return EMPTY_BINDINGSET; - } - } - - if (!mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)) { - bs.addBinding(mappedVar, bindingSet.getValue(var)); - } - } - return bs; - } - - /** - * This method forms the cross-product between an input BindingSet and the - * BindingSets contained in crossProdcutBs. - * - * @param bs - * - {@link BindingSet} used to form cross product with - * cross-product BindingSets - * @return - Iterator over resulting cross-product - */ - private Iterator<BindingSet> getCrossProducts(BindingSet bs) { - Set<BindingSet> crossProducts = new HashSet<BindingSet>(); - - for (BindingSet bSet : crossProductBs) { - BindingSet prod = takeCrossProduct(bSet, bs); - if (prod != EMPTY_BINDINGSET) { - crossProducts.add(prod); - } - } - - return crossProducts.iterator(); - - } - - /** - * This method compute the cross product of the BindingSet passed to the PCJ - * and the PCJ BindingSet. It verifies that only common variables are unassured - * variables, and if leftBs and rightBs have distinct values for a given variable, - * this method uses the value from leftBs in the cross product BindingSet - this - * is effectively performing a LeftJoin. - * - * @param leftBs - BindingSet passed to PCJ - * @param rightBs - PCJ BindingSet - * @return - cross product BindingSet - */ - private BindingSet takeCrossProduct(BindingSet leftBs, BindingSet rightBs) { - if (bindingSetsIntersect(leftBs, rightBs)) { - return EMPTY_BINDINGSET; - } - QueryBindingSet bs = new QueryBindingSet(leftBs); - - //only add Bindings corresponding to variables that have no value - //assigned. This takes into account case where leftBs and rightBs - //share a common, unAssuredVariable. In this case, use value corresponding - //to leftBs, which is effectively performing a LeftJoin. - for(String s: rightBs.getBindingNames()) { - if(bs.getValue(s) == null) { - bs.addBinding(s, rightBs.getValue(s)); - } - } - return bs; - } - - private boolean bindingSetsIntersect(BindingSet bs1, BindingSet bs2) { - - for(String s: bs1.getBindingNames()) { - if(bs2.getValue(s) != null && !unAssuredVariables.contains(s)) { - return true; - } - } - return false; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java deleted file mode 100644 index 1b821d4..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java +++ /dev/null @@ -1,199 +0,0 @@ -package mvm.rya.accumulo.pcj.iterators; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; - -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.model.Value; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBiMap; - -/** - * This class takes in a {@link Scanner} and a Collection of BindingSets, - * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into a - * {@link BindingSet}, and creates a {@link Map.Entry<String, BindingSet>} - * object to perform as hash join. The user can also specify a {@link Map - * <String, Value>} of constant constraints that can be used to filter. - * - */ -public class PCJKeyToJoinBindingSetIterator - implements - CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> { - - //map of variables as they appear in PCJ table to query variables - private Map<String, String> pcjVarMap; - //constant constraints used for filtering - private Map<String, Value> constantConstraints; - //max number of variables an entry in the batch of BindingSets had in common with PCJ - //this is used for constructing hash join key. - private int maxPrefixLen; - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - private final Map.Entry<String, BindingSet> EMPTY_ENTRY = new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( - "", new QueryBindingSet()); - private Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> iterator; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private Map.Entry<String, BindingSet> next; - private BatchScanner scanner; - - public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, - Map<String, String> pcjVarMap, - Map<String, Value> constantConstraints, int maxPrefixLen) { - Preconditions.checkNotNull(scanner); - Preconditions.checkArgument(pcjVarMap.size() > 0, - "Variable map must contain at least one variable!"); - Preconditions.checkNotNull(constantConstraints, - "Constant constraints cannot be null."); - Preconditions.checkArgument(maxPrefixLen > 0, - "Max prefix length must be greater than 0."); - Preconditions - .checkArgument(maxPrefixLen <= pcjVarMap.size(), - "Max prefix length must be less than total number of binding names."); - this.scanner = scanner; - this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); - this.constantConstraints = constantConstraints; - this.maxPrefixLen = maxPrefixLen; - this.iterator = scanner.iterator(); - - } - - public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, - Map<String, String> pcjVarMap, int maxPrefixLen) { - this(scanner, pcjVarMap, new HashMap<String, Value>(), maxPrefixLen); - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - - if (!hasNextCalled && !isEmpty) { - while (iterator.hasNext()) { - Key key = iterator.next().getKey(); - // get bindings from scan without values associated with - // constant constraints - try { - next = getBindingSetEntryAndMatchConstants(key); - } catch (BindingSetConversionException e) { - throw new QueryEvaluationException( - "Could not deserialize PCJ BindingSet."); - } - // skip key if constant constraint don't match - if (next == EMPTY_ENTRY) { - continue; - } - hasNextCalled = true; - return true; - } - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public Entry<String, BindingSet> next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return next; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - scanner.close(); - } - - /** - * - * @param key - * - Accumulo key obtained from scan - * @return - Entry<String,BindingSet> satisfying the constant constraints - * @throws BindingSetConversionException - */ - private Map.Entry<String, BindingSet> getBindingSetEntryAndMatchConstants( - Key key) throws BindingSetConversionException { - byte[] row = key.getRow().getBytes(); - String[] varOrder = key.getColumnFamily().toString() - .split(ExternalTupleSet.VAR_ORDER_DELIM); - - BindingSet bindingSet = converter.convert(row, new VariableOrder( - varOrder)); - - QueryBindingSet bs = new QueryBindingSet(); - for (String var : bindingSet.getBindingNames()) { - String mappedVar = pcjVarMap.get(var); - if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) - && constantConstraints.containsKey(mappedVar) - && !constantConstraints.get(mappedVar).equals( - bindingSet.getValue(var))) { - return EMPTY_ENTRY; - } else { - bs.addBinding(mappedVar, bindingSet.getValue(var)); - } - } - - String orderedValueString = bindingSet.getValue(varOrder[0]).toString(); - for (int i = 1; i < maxPrefixLen; i++) { - Value value = bindingSet.getValue(varOrder[i]); - if (value != null) { - orderedValueString = orderedValueString - + ExternalTupleSet.VALUE_DELIM + value.toString(); - } - } - - return new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( - orderedValueString, bs); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java deleted file mode 100644 index 53f29f4..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailConnection; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - -import info.aduna.iteration.CloseableIteration; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.client.BatchUpdatePCJ; -import mvm.rya.api.client.InstanceDoesNotExistException; -import mvm.rya.api.client.PCJDoesNotExistException; -import mvm.rya.api.client.RyaClientException; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import mvm.rya.api.instance.RyaDetailsUpdater; -import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; -import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; -import mvm.rya.sail.config.RyaSailFactory; - -/** - * Uses an in memory Rya Client to batch update a PCJ index. - */ -public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ { - - private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class); - - /** - * Constructs an instance of {@link AccumuloBatchUpdatePCJ}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - } - - @Override - public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { - requireNonNull(ryaInstanceName); - requireNonNull(pcjId); - verifyPCJState(ryaInstanceName, pcjId); - updatePCJResults(ryaInstanceName, pcjId); - updatePCJMetadata(ryaInstanceName, pcjId); - } - - private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException { - try { - // Fetch the Rya instance's details. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); - final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails(); - - // Ensure PCJs are enabled. - if(!ryaDetails.getPCJIndexDetails().isEnabled()) { - throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'."); - } - - // Ensure the PCJ exists. - if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) { - throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'."); - } - - // Ensure the PCJ is not already being incrementally updated. - final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); - final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy(); - if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) { - throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally."); - } - } catch(final NotInitializedException e) { - throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e); - } catch (final RyaDetailsRepositoryException e) { - throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e); - } - } - - private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { - // Things that have to be closed before we exit. - Sail sail = null; - SailConnection sailConn = null; - CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null; - - try { - // Create an instance of Sail backed by the Rya instance. - sail = connectToRya(ryaInstanceName); - - // Purge the old results from the PCJ. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName); - try { - pcjStorage.purge(pcjId); - } catch (final PCJStorageException e) { - throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " + - "results could not be purged from it.", e); - } - - try { - // Parse the PCJ's SPARQL query. - final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = metadata.getSparql(); - final SPARQLParser parser = new SPARQLParser(); - final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - - // Execute the query. - sailConn = sail.getConnection(); - results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); - - // Load the results into the PCJ table. - final List<VisibilityBindingSet> batch = new ArrayList<>(1000); - - while(results.hasNext()) { - final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), ""); - batch.add(result); - - if(batch.size() == 1000) { - pcjStorage.addResults(pcjId, batch); - batch.clear(); - } - } - - if(!batch.isEmpty()) { - pcjStorage.addResults(pcjId, batch); - batch.clear(); - } - } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) { - throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e); - } - } finally { - if(results != null) { - try { - results.close(); - } catch (final QueryEvaluationException e) { - log.warn(e.getMessage(), e); - } - } - - if(sailConn != null) { - try { - sailConn.close(); - } catch (final SailException e) { - log.warn(e.getMessage(), e); - } - } - - if(sail != null) { - try { - sail.shutDown(); - } catch (final SailException e) { - log.warn(e.getMessage(), e); - } - } - } - } - - private Sail connectToRya(final String ryaInstanceName) throws RyaClientException { - try { - final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails(); - - final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); - ryaConf.setTablePrefix(ryaInstanceName); - ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); - ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); - ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); - ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); - - // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results. - ryaConf.set(ConfigUtils.USE_PCJ, "false"); - - return RyaSailFactory.getInstance(ryaConf); - } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { - throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e); - } - } - - private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException { - // Update the PCJ's metadata to indicate it was just batch updated. - try { - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); - - new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { - @Override - public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { - // Update the original PCJ Details to indicate they were batch updated. - final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); - final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) - .setUpdateStrategy( PCJUpdateStrategy.BATCH ) - .setLastUpdateTime( new Date()); - - // Replace the old PCJ Details with the updated ones. - final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); - builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); - return builder.build(); - } - }); - } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { - throw new RyaClientException("Could not update the PCJ's metadata.", e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java deleted file mode 100644 index 078e985..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; - -/** - * An abstract class that holds onto Accumulo access information. Extend this - * when implementing a command that interacts with Accumulo. - */ -@ParametersAreNonnullByDefault -public abstract class AccumuloCommand { - - private final AccumuloConnectionDetails connectionDetails; - private final Connector connector; - - /** - * Constructs an instance of {@link AccumuloCommand}. - * - * Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo - * that hosts Rya instance. (not null) - */ - public AccumuloCommand( - final AccumuloConnectionDetails connectionDetails, - final Connector connector) { - this.connectionDetails = requireNonNull( connectionDetails ); - this.connector = requireNonNull(connector); - } - - /** - * @return Details about the values that were used to create the connector to the cluster. (not null) - */ - public AccumuloConnectionDetails getAccumuloConnectionDetails() { - return connectionDetails; - } - - /** - * @return Provides programatic access to the instance of Accumulo that hosts Rya instance. - */ - public Connector getConnector() { - return connector; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java deleted file mode 100644 index c0a7be7..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; -import javax.annotation.concurrent.Immutable; - -/** - * The information that the shell used to connect to Accumulo. - */ -@Immutable -@ParametersAreNonnullByDefault -public class AccumuloConnectionDetails { - private final String username; - private final char[] password; - private final String instanceName; - private final String zookeepers; - - /** - * Constructs an instance of {@link AccumuloConnectionDetails}. - * - * @param username - The username that was used to establish the connection. (not null) - * @param password - The password that was used to establish the connection. (not null) - * @param instanceName - The Accumulo instance name that was used to establish the connection. (not null) - * @param zookeepers - The list of zookeeper hostname that were used to establish the connection. (not null) - */ - public AccumuloConnectionDetails( - final String username, - final char[] password, - final String instanceName, - final String zookeepers) { - this.username = requireNonNull(username); - this.password = requireNonNull(password); - this.instanceName = requireNonNull(instanceName); - this.zookeepers = requireNonNull(zookeepers); - } - - /** - * @return The username that was used to establish the connection. - */ - public String getUsername() { - return username; - } - - /** - * @return The password that was used to establish the connection. - */ - public char[] getPassword() { - return password; - } - - /** - * @return The Accumulo instance name that was used to establish the connection. - */ - public String getInstanceName() { - return instanceName; - } - - /** - * @return The list of zookeeper hostname that were used to establish the connection. - */ - public String getZookeepers() { - return zookeepers; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java deleted file mode 100644 index 4cf0935..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.repository.RepositoryException; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.client.FluoClient; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.client.CreatePCJ; -import mvm.rya.api.client.GetInstanceDetails; -import mvm.rya.api.client.InstanceDoesNotExistException; -import mvm.rya.api.client.RyaClientException; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import mvm.rya.api.instance.RyaDetailsUpdater; -import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; -import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; -import mvm.rya.rdftriplestore.RdfCloudTripleStore; -import mvm.rya.rdftriplestore.RyaSailRepository; - -/** - * An Accumulo implementation of the {@link CreatePCJ} command. - */ -@ParametersAreNonnullByDefault -public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { - - private final GetInstanceDetails getInstanceDetails; - - /** - * Constructs an instance of {@link AccumuloCreatePCJ}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloCreatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); - } - - @Override - public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException { - requireNonNull(instanceName); - requireNonNull(sparql); - - final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName); - final boolean ryaInstanceExists = ryaDetailsHolder.isPresent(); - if(!ryaInstanceExists) { - throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); - } - - final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails(); - final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled(); - if(!pcjIndexingEnabeld) { - throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); - } - - // Create the PCJ table that will receive the index results. - final String pcjId; - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName); - try { - pcjId = pcjStorage.createPcj(sparql); - } catch (final PCJStorageException e) { - throw new RyaClientException("Problem while initializing the PCJ table.", e); - } - - // If a Fluo application is being used, task it with updating the PCJ. - final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); - if(fluoDetailsHolder.isPresent()) { - final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); - try { - updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); - } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { - throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); - } - - // Update the Rya Details to indicate the PCJ is being updated incrementally. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); - try { - new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { - @Override - public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { - // Update the original PCJ Details to indicate they are incrementally updated. - final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); - final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) - .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); - - // Replace the old PCJ Details with the updated ones. - final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); - builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); - return builder.build(); - } - }); - } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { - throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); - } - } - - // Return the ID that was assigned to the PCJ. - return pcjId; - } - - private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException { - requireNonNull(pcjStorage); - requireNonNull(pcjId); - - // Connect to the Fluo application that is updating this instance's PCJs. - final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); - final FluoClient fluoClient = new FluoClientFactory().connect( - cd.getUsername(), - new String(cd.getPassword()), - cd.getInstanceName(), - cd.getZookeepers(), - fluoAppName); - - // Setup the Rya client that is able to talk to scan Rya's statements. - final RyaSailRepository ryaSailRepo = makeRyaRepository(getConnector(), ryaInstance); - - // Initialize the PCJ within the Fluo application. - final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); - fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaSailRepo); - } - - private static RyaSailRepository makeRyaRepository(final Connector connector, final String ryaInstance) throws RepositoryException { - checkNotNull(connector); - checkNotNull(ryaInstance); - - // Setup Rya configuration values. - final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); - ryaConf.setTablePrefix( ryaInstance ); - - // Connect to the Rya repo using the provided Connector. - final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); - accumuloRyaDao.setConnector(connector); - accumuloRyaDao.setConf(ryaConf); - - final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); - ryaStore.setRyaDAO(accumuloRyaDao); - - final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); - ryaRepo.initialize(); - return ryaRepo; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java deleted file mode 100644 index 233a265..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; -import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.client.FluoClient; -import mvm.rya.api.client.DeletePCJ; -import mvm.rya.api.client.GetInstanceDetails; -import mvm.rya.api.client.InstanceDoesNotExistException; -import mvm.rya.api.client.RyaClientException; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; - -/** - * An Accumulo implementation of the {@link DeletePCJ} command. - */ -@ParametersAreNonnullByDefault -public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { - - private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class); - - private final GetInstanceDetails getInstanceDetails; - - /** - * Constructs an instance of {@link AccumuloDeletePCJ}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloDeletePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); - } - - @Override - public void deletePCJ(final String instanceName, final String pcjId) throws InstanceDoesNotExistException, RyaClientException { - requireNonNull(instanceName); - requireNonNull(pcjId); - - final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName); - final boolean ryaInstanceExists = originalDetails.isPresent(); - if(!ryaInstanceExists) { - throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); - } - - final boolean pcjIndexingEnabeld = originalDetails.get().getPCJIndexDetails().isEnabled(); - if(!pcjIndexingEnabeld) { - throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); - } - - final boolean pcjExists = originalDetails.get().getPCJIndexDetails().getPCJDetails().containsKey( pcjId ); - if(!pcjExists) { - throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ with ID '%s'.", instanceName, pcjId)); - } - - // If the PCJ was being maintained by a Fluo application, then stop that process. - final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails(); - final PCJDetails droppedPcjDetails = pcjIndexDetails.getPCJDetails().get( pcjId ); - if(droppedPcjDetails.getUpdateStrategy().isPresent()) { - if(droppedPcjDetails.getUpdateStrategy().get() == PCJUpdateStrategy.INCREMENTAL) { - final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); - - if(fluoDetailsHolder.isPresent()) { - final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName(); - stopUpdatingPCJ(fluoAppName, pcjId); - } else { - log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are " + - "missing for the Rya instance named '%s'.", instanceName)); - } - } - } - - // Drop the table that holds the PCJ results from Accumulo. - final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName); - try { - pcjs.dropPcj(pcjId); - } catch (final PCJStorageException e) { - throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e); - } - } - - private void stopUpdatingPCJ(final String fluoAppName, final String pcjId) { - requireNonNull(fluoAppName); - requireNonNull(pcjId); - - // Connect to the Fluo application that is updating this instance's PCJs. - final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); - final FluoClient fluoClient = new FluoClientFactory().connect( - cd.getUsername(), - new String(cd.getPassword()), - cd.getInstanceName(), - cd.getZookeepers(), - fluoAppName); - - // Delete the PCJ from the Fluo App. - new DeletePcj(1000).deletePcj(fluoClient, pcjId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java deleted file mode 100644 index a2fed78..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; - -import com.google.common.base.Optional; - -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.client.GetInstanceDetails; -import mvm.rya.api.client.InstanceDoesNotExistException; -import mvm.rya.api.client.InstanceExists; -import mvm.rya.api.client.RyaClientException; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; - -/** - * An Accumulo implementation of the {@link GetInstanceDetails} command. - */ -@ParametersAreNonnullByDefault -public class AccumuloGetInstanceDetails extends AccumuloCommand implements GetInstanceDetails { - - private final InstanceExists instanceExists; - - /** - * Constructs an instance of {@link AccumuloGetInstanceDetails}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloGetInstanceDetails(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - instanceExists = new AccumuloInstanceExists(connectionDetails, connector); - } - - @Override - public Optional<RyaDetails> getDetails(final String instanceName) throws InstanceDoesNotExistException, RyaClientException { - requireNonNull(instanceName); - - // Ensure the Rya instance exists. - if(!instanceExists.exists(instanceName)) { - throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", instanceName)); - } - - // If the instance has details, then return them. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); - try { - return Optional.of( detailsRepo.getRyaInstanceDetails() ); - } catch (final NotInitializedException e) { - return Optional.absent(); - } catch (final RyaDetailsRepositoryException e) { - throw new RyaClientException("Could not fetch the Rya instance's details.", e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java deleted file mode 100644 index 08c1932..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import java.util.Date; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.client.Install; -import mvm.rya.api.client.InstanceExists; -import mvm.rya.api.client.RyaClientException; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; -import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; -import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; -import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; -import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; -import mvm.rya.api.instance.RyaDetails.ProspectorDetails; -import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; -import mvm.rya.api.instance.RyaDetailsRepository; -import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; -import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; -import mvm.rya.sail.config.RyaSailFactory; - -/** - * An Accumulo implementation of the {@link Install} command. - */ - -@ParametersAreNonnullByDefault -public class AccumuloInstall extends AccumuloCommand implements Install { - - private final InstanceExists instanceExists; - - /** - * Constructs an instance of {@link AccumuloInstall}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloInstall(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - instanceExists = new AccumuloInstanceExists(connectionDetails, connector); - } - - @Override - public void install(final String instanceName, final InstallConfiguration installConfig) throws DuplicateInstanceNameException, RyaClientException { - requireNonNull(instanceName); - requireNonNull(installConfig); - - // Check to see if a Rya instance has already been installed with this name. - if(instanceExists.exists(instanceName)) { - throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + - "with the name '" + instanceName + "'. Try again with a different name."); - } - - // Initialize the Rya Details table. - RyaDetails details; - try { - details = initializeRyaDetails(instanceName, installConfig); - } catch (final AlreadyInitializedException e) { - // This can only happen if somebody else installs an instance of Rya with the name between the check and now. - throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + - "with the name '" + instanceName + "'. Try again with a different name."); - } catch (final RyaDetailsRepositoryException e) { - throw new RyaClientException("The RyaDetails couldn't be initialized. Details: " + e.getMessage(), e); - } - - // Initialize the rest of the tables used by the Rya instance. - final AccumuloRdfConfiguration ryaConfig = makeRyaConfig(getAccumuloConnectionDetails(), details); - try { - final Sail ryaSail = RyaSailFactory.getInstance(ryaConfig); - ryaSail.shutDown(); - } catch (final AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { - throw new RyaClientException("Could not initialize all of the tables for the new Rya instance. " + - "This instance may be left in a bad state.", e); - } catch (final SailException e) { - throw new RyaClientException("Problem shutting down the Sail object used to install Rya.", e); - } - } - - /** - * @return The version of the application as reported by the manifest. - */ - private String getVersion() { - return "" + this.getClass().getPackage().getImplementationVersion(); - } - - /** - * Initializes the {@link RyaDetails} and stores them for the new instance. - * - * @param instanceName - The name of the instance that is being created. (not null) - * @param installConfig - The instance's install configuration. (not null) - * @return The {@link RyaDetails} that were stored. - * @throws AlreadyInitializedException Could not be initialized because - * a table with this instance name has already exists and is holding the details. - * @throws RyaDetailsRepositoryException Something caused the initialization - * operation to fail. - */ - private RyaDetails initializeRyaDetails(final String instanceName, final InstallConfiguration installConfig) - throws AlreadyInitializedException, RyaDetailsRepositoryException { - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); - - // Build the PCJ Index details. - final PCJIndexDetails.Builder pcjDetailsBuilder = PCJIndexDetails.builder() - .setEnabled(installConfig.isPcjIndexEnabled()); - if(installConfig.getFluoPcjAppName().isPresent()) { - final String fluoPcjAppName = installConfig.getFluoPcjAppName().get(); - pcjDetailsBuilder.setFluoDetails(new FluoDetails( fluoPcjAppName )); - } - - final RyaDetails details = RyaDetails.builder() - // General Metadata - .setRyaInstanceName(instanceName) - .setRyaVersion( getVersion() ) - - // Secondary Index Values - .setGeoIndexDetails( - new GeoIndexDetails(installConfig.isGeoIndexEnabled())) - .setTemporalIndexDetails( - new TemporalIndexDetails(installConfig.isTemporalIndexEnabled())) - .setFreeTextDetails( - new FreeTextIndexDetails(installConfig.isFreeTextIndexEnabled())) - .setEntityCentricIndexDetails( - new EntityCentricIndexDetails(installConfig.isEntityCentrixIndexEnabled())) - .setPCJIndexDetails( pcjDetailsBuilder ) - - // Statistics values. - .setProspectorDetails( - new ProspectorDetails(Optional.<Date>absent()) ) - .setJoinSelectivityDetails( - new JoinSelectivityDetails(Optional.<Date>absent()) ) - .build(); - - // Initialize the table. - detailsRepo.initialize(details); - - return details; - } - - /** - * Builds a {@link AccumuloRdfConfiguration} object that will be used by the - * Rya DAO to initialize all of the tables it will need. - * - * @param connectionDetails - Indicates how to connect to Accumulo. (not null) - * @param details - Indicates what needs to be installed. (not null) - * @return A Rya Configuration object that can be used to perform the install. - */ - private static AccumuloRdfConfiguration makeRyaConfig(final AccumuloConnectionDetails connectionDetails, final RyaDetails details) { - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - // The Rya Instance Name is used as a prefix for the index tables in Accumulo. - conf.setTablePrefix( details.getRyaInstanceName() ); - - // Enable the indexers that the instance is configured to use. - // TODO fix me, not sure why the install command is here. -// conf.set(ConfigUtils.USE_GEO, "" + details.getGeoIndexDetails().isEnabled() ); - conf.set(ConfigUtils.USE_FREETEXT, "" + details.getFreeTextIndexDetails().isEnabled() ); - conf.set(ConfigUtils.USE_TEMPORAL, "" + details.getTemporalIndexDetails().isEnabled() ); - conf.set(ConfigUtils.USE_ENTITY, "" + details.getEntityCentricIndexDetails().isEnabled()); - - conf.set(ConfigUtils.USE_PCJ, "" + details.getPCJIndexDetails().isEnabled() ); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString()); - - final Optional<FluoDetails> fluoHolder = details.getPCJIndexDetails().getFluoDetails(); - final PrecomputedJoinUpdaterType updaterType = fluoHolder.isPresent() ? PrecomputedJoinUpdaterType.FLUO : PrecomputedJoinUpdaterType.NO_UPDATE; - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, updaterType.toString()); - - // XXX The Accumulo implementation of the secondary indices make need all - // of the accumulo connector's parameters to initialize themselves, so - // we need to include them here. It would be nice if the secondary - // indexers used the connector that is provided to them instead of - // building a new one. - conf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); - - // This initializes the living indexers that will be used by the application and - // caches them within the configuration object so that they may be used later. - ConfigUtils.setIndexers(conf); - - return conf; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java deleted file mode 100644 index 1be7cd8..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.admin.TableOperations; - -import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.client.InstanceExists; -import mvm.rya.api.client.RyaClientException; - -/** - * An Accumulo implementation of the {@link InstanceExists} command. - */ -@ParametersAreNonnullByDefault -public class AccumuloInstanceExists extends AccumuloCommand implements InstanceExists { - - /** - * Constructs an insatnce of {@link AccumuloInstanceExists}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloInstanceExists(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - } - - @Override - public boolean exists(final String instanceName) throws RyaClientException { - requireNonNull( instanceName ); - - final TableOperations tableOps = getConnector().tableOperations(); - - // Newer versions of Rya will have a Rya Details table. - final String ryaDetailsTableName = instanceName + AccumuloRyaInstanceDetailsRepository.INSTANCE_DETAILS_TABLE_NAME; - if(tableOps.exists(ryaDetailsTableName)) { - return true; - } - - // However, older versions only have the data tables. - final String spoTableName = instanceName + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; - final String posTableName = instanceName + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; - final String ospTableName = instanceName + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; - if(tableOps.exists(spoTableName) && tableOps.exists(posTableName) && tableOps.exists(ospTableName)) { - return true; - } - - return false; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java deleted file mode 100644 index 86d96b8..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; - -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.client.ListInstances; -import mvm.rya.api.client.RyaClientException; - -/** - * An Accumulo implementation of the {@link ListInstances} command. - */ -@ParametersAreNonnullByDefault -public class AccumuloListInstances extends AccumuloCommand implements ListInstances { - - private final Pattern spoPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); - private final Pattern ospPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); - private final Pattern poPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); - - /** - * Constructs an instance of {@link AccumuloListInstances}. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) - */ - public AccumuloListInstances(final AccumuloConnectionDetails connectionDetails, final Connector connector) { - super(connectionDetails, connector); - } - - @Override - public List<String> listInstances() throws RyaClientException { - // Figure out what the instance names might be. - final Map<String, InstanceTablesFound> proposedInstanceNames = new HashMap<>(); - - for(final String table : getConnector().tableOperations().list()) { - // SPO table - final Matcher spoMatcher = spoPattern.matcher(table); - if(spoMatcher.find()) { - final String instanceName = spoMatcher.group(1); - makeOrGetInstanceTables(proposedInstanceNames, instanceName).setSpoFound(); - } - - // OSP table - final Matcher ospMatcher = ospPattern.matcher(table); - if(ospMatcher.find()) { - final String instanceName = ospMatcher.group(1); - makeOrGetInstanceTables(proposedInstanceNames, instanceName).setOspFound(); - } - - // PO table - final Matcher poMatcher = poPattern.matcher(table); - if(poMatcher.find()) { - final String instanceName = poMatcher.group(1); - makeOrGetInstanceTables(proposedInstanceNames, instanceName).setPoFound(); - } - } - - // Determine which of the proposed names fit the expected Rya table structures. - final List<String> instanceNames = new ArrayList<>(); - for(final Entry<String, InstanceTablesFound> entry : proposedInstanceNames.entrySet()) { - final InstanceTablesFound tables = entry.getValue(); - if(tables.allFlagsSet()) { - instanceNames.add(entry.getKey()); - } - } - - return instanceNames; - } - - private InstanceTablesFound makeOrGetInstanceTables(final Map<String, InstanceTablesFound> lookup, final String instanceName) { - if(!lookup.containsKey(instanceName)) { - lookup.put(instanceName, new InstanceTablesFound()); - } - return lookup.get(instanceName); - } - - /** - * Flags that are used to determine if a String is a Rya Instance name. - */ - @ParametersAreNonnullByDefault - private static class InstanceTablesFound { - private boolean spoFound = false; - private boolean ospFound = false; - private boolean poFound = false; - - /** - * Sets the SPO table as seen. - */ - public void setSpoFound() { - spoFound = true; - } - - /** - * Sets the OSP table as seen. - */ - public void setOspFound() { - ospFound = true; - } - - /** - * Sets the POS table as seen. - */ - public void setPoFound() { - poFound = true; - } - - /** - * @return {@code true} if all of the flags have been set; otherwise {@code false}. - */ - public boolean allFlagsSet() { - return spoFound && ospFound && poFound; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java deleted file mode 100644 index 102f667..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; - -import mvm.rya.api.client.RyaClient; - -/** - * Constructs instance of {@link RyaClient} that are connected to instance of - * Rya hosted by Accumulo clusters. - */ -@ParametersAreNonnullByDefault -public class AccumuloRyaClientFactory { - - /** - * Initialize a set of {@link RyaClient} that will interact with an instance of - * Rya that is hosted by an Accumulo cluster. - * - * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) - * @param connector - The Accumulo connector the commands will use. (not null) - * @return The initialized commands. - */ - public static RyaClient build( - final AccumuloConnectionDetails connectionDetails, - final Connector connector) { - requireNonNull(connectionDetails); - requireNonNull(connector); - - // Build the RyaCommands option with the initialized commands. - return new RyaClient( - new AccumuloInstall(connectionDetails, connector), - new AccumuloCreatePCJ(connectionDetails, connector), - new AccumuloDeletePCJ(connectionDetails, connector), - new AccumuloBatchUpdatePCJ(connectionDetails, connector), - new AccumuloGetInstanceDetails(connectionDetails, connector), - new AccumuloInstanceExists(connectionDetails, connector), - new AccumuloListInstances(connectionDetails, connector)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java deleted file mode 100644 index 0c6c2f1..0000000 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.api.client.accumulo; - -import static java.util.Objects.requireNonNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.config.FluoConfiguration; - -/** - * Creates {@link FluoClient}s that are connected to a specific Fluo Application. - */ -@ParametersAreNonnullByDefault -public class FluoClientFactory { - - /** - * Create a {@link FluoClient} that uses the provided connection details. - * - * @param username - The username the connection will use. (not null) - * @param password - The password the connection will use. (not null) - * @param instanceName - The name of the Accumulo instance. (not null) - * @param zookeeperHostnames - A comma delimited list of the Zookeeper server hostnames. (not null) - * @param fluoAppName - The Fluo Application the client will be connected to. (not null) - * @return A {@link FluoClient} that may be used to access the Fluo Application. - */ - public FluoClient connect( - final String username, - final String password, - final String instanceName, - final String zookeeperHostnames, - final String fluoAppName) { - requireNonNull(username); - requireNonNull(password); - requireNonNull(instanceName); - requireNonNull(zookeeperHostnames); - requireNonNull(fluoAppName); - - final FluoConfiguration fluoConfig = new FluoConfiguration(); - - // Fluo configuration values. - fluoConfig.setApplicationName( fluoAppName ); - fluoConfig.setInstanceZookeepers( zookeeperHostnames + "/fluo" ); - - // Accumulo Connection Stuff. - fluoConfig.setAccumuloZookeepers( zookeeperHostnames ); - fluoConfig.setAccumuloInstance( instanceName ); - fluoConfig.setAccumuloUser( username ); - fluoConfig.setAccumuloPassword( password ); - - // Connect the client. - return FluoFactory.newClient(fluoConfig); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/DocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/DocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/DocIdIndexer.java deleted file mode 100644 index 21d5de7..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/DocIdIndexer.java +++ /dev/null @@ -1,47 +0,0 @@ -package mvm.rya.indexing; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; - -import mvm.rya.indexing.accumulo.entity.StarQuery; - -import org.apache.accumulo.core.client.TableNotFoundException; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; - -public interface DocIdIndexer extends Closeable { - - - - public abstract CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, - Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException; - - - - @Override - public abstract void close() throws IOException; - -}
