http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java new file mode 100644 index 0000000..4cf0935 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import org.apache.fluo.api.client.FluoClient; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.CreatePCJ; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.instance.RyaDetailsUpdater; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * An Accumulo implementation of the {@link CreatePCJ} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloCreatePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloCreatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(sparql); + + final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = ryaDetailsHolder.isPresent(); + if(!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails(); + final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled(); + if(!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + // Create the PCJ table that will receive the index results. + final String pcjId; + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName); + try { + pcjId = pcjStorage.createPcj(sparql); + } catch (final PCJStorageException e) { + throw new RyaClientException("Problem while initializing the PCJ table.", e); + } + + // If a Fluo application is being used, task it with updating the PCJ. + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + if(fluoDetailsHolder.isPresent()) { + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } + + // Update the Rya Details to indicate the PCJ is being updated incrementally. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they are incrementally updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); + } + } + + // Return the ID that was assigned to the PCJ. + return pcjId; + } + + private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException { + requireNonNull(pcjStorage); + requireNonNull(pcjId); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName); + + // Setup the Rya client that is able to talk to scan Rya's statements. + final RyaSailRepository ryaSailRepo = makeRyaRepository(getConnector(), ryaInstance); + + // Initialize the PCJ within the Fluo application. + final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); + fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaSailRepo); + } + + private static RyaSailRepository makeRyaRepository(final Connector connector, final String ryaInstance) throws RepositoryException { + checkNotNull(connector); + checkNotNull(ryaInstance); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix( ryaInstance ); + + // Connect to the Rya repo using the provided Connector. + final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); + accumuloRyaDao.setConnector(connector); + accumuloRyaDao.setConf(ryaConf); + + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + ryaStore.setRyaDAO(accumuloRyaDao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + return ryaRepo; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java new file mode 100644 index 0000000..233a265 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; + +import org.apache.fluo.api.client.FluoClient; +import mvm.rya.api.client.DeletePCJ; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; + +/** + * An Accumulo implementation of the {@link DeletePCJ} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { + + private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class); + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloDeletePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloDeletePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public void deletePCJ(final String instanceName, final String pcjId) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(pcjId); + + final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = originalDetails.isPresent(); + if(!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final boolean pcjIndexingEnabeld = originalDetails.get().getPCJIndexDetails().isEnabled(); + if(!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + final boolean pcjExists = originalDetails.get().getPCJIndexDetails().getPCJDetails().containsKey( pcjId ); + if(!pcjExists) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ with ID '%s'.", instanceName, pcjId)); + } + + // If the PCJ was being maintained by a Fluo application, then stop that process. + final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails(); + final PCJDetails droppedPcjDetails = pcjIndexDetails.getPCJDetails().get( pcjId ); + if(droppedPcjDetails.getUpdateStrategy().isPresent()) { + if(droppedPcjDetails.getUpdateStrategy().get() == PCJUpdateStrategy.INCREMENTAL) { + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + + if(fluoDetailsHolder.isPresent()) { + final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName(); + stopUpdatingPCJ(fluoAppName, pcjId); + } else { + log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are " + + "missing for the Rya instance named '%s'.", instanceName)); + } + } + } + + // Drop the table that holds the PCJ results from Accumulo. + final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName); + try { + pcjs.dropPcj(pcjId); + } catch (final PCJStorageException e) { + throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e); + } + } + + private void stopUpdatingPCJ(final String fluoAppName, final String pcjId) { + requireNonNull(fluoAppName); + requireNonNull(pcjId); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName); + + // Delete the PCJ from the Fluo App. + new DeletePcj(1000).deletePcj(fluoClient, pcjId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloGetInstanceDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloGetInstanceDetails.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloGetInstanceDetails.java new file mode 100644 index 0000000..a2fed78 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloGetInstanceDetails.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import com.google.common.base.Optional; + +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * An Accumulo implementation of the {@link GetInstanceDetails} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloGetInstanceDetails extends AccumuloCommand implements GetInstanceDetails { + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloGetInstanceDetails}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloGetInstanceDetails(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public Optional<RyaDetails> getDetails(final String instanceName) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + + // Ensure the Rya instance exists. + if(!instanceExists.exists(instanceName)) { + throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", instanceName)); + } + + // If the instance has details, then return them. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + return Optional.of( detailsRepo.getRyaInstanceDetails() ); + } catch (final NotInitializedException e) { + return Optional.absent(); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("Could not fetch the Rya instance's details.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstall.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstall.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstall.java new file mode 100644 index 0000000..08c1932 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstall.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Date; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * An Accumulo implementation of the {@link Install} command. + */ + +@ParametersAreNonnullByDefault +public class AccumuloInstall extends AccumuloCommand implements Install { + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloInstall}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloInstall(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public void install(final String instanceName, final InstallConfiguration installConfig) throws DuplicateInstanceNameException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(installConfig); + + // Check to see if a Rya instance has already been installed with this name. + if(instanceExists.exists(instanceName)) { + throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + + "with the name '" + instanceName + "'. Try again with a different name."); + } + + // Initialize the Rya Details table. + RyaDetails details; + try { + details = initializeRyaDetails(instanceName, installConfig); + } catch (final AlreadyInitializedException e) { + // This can only happen if somebody else installs an instance of Rya with the name between the check and now. + throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + + "with the name '" + instanceName + "'. Try again with a different name."); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("The RyaDetails couldn't be initialized. Details: " + e.getMessage(), e); + } + + // Initialize the rest of the tables used by the Rya instance. + final AccumuloRdfConfiguration ryaConfig = makeRyaConfig(getAccumuloConnectionDetails(), details); + try { + final Sail ryaSail = RyaSailFactory.getInstance(ryaConfig); + ryaSail.shutDown(); + } catch (final AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new RyaClientException("Could not initialize all of the tables for the new Rya instance. " + + "This instance may be left in a bad state.", e); + } catch (final SailException e) { + throw new RyaClientException("Problem shutting down the Sail object used to install Rya.", e); + } + } + + /** + * @return The version of the application as reported by the manifest. + */ + private String getVersion() { + return "" + this.getClass().getPackage().getImplementationVersion(); + } + + /** + * Initializes the {@link RyaDetails} and stores them for the new instance. + * + * @param instanceName - The name of the instance that is being created. (not null) + * @param installConfig - The instance's install configuration. (not null) + * @return The {@link RyaDetails} that were stored. + * @throws AlreadyInitializedException Could not be initialized because + * a table with this instance name has already exists and is holding the details. + * @throws RyaDetailsRepositoryException Something caused the initialization + * operation to fail. + */ + private RyaDetails initializeRyaDetails(final String instanceName, final InstallConfiguration installConfig) + throws AlreadyInitializedException, RyaDetailsRepositoryException { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + + // Build the PCJ Index details. + final PCJIndexDetails.Builder pcjDetailsBuilder = PCJIndexDetails.builder() + .setEnabled(installConfig.isPcjIndexEnabled()); + if(installConfig.getFluoPcjAppName().isPresent()) { + final String fluoPcjAppName = installConfig.getFluoPcjAppName().get(); + pcjDetailsBuilder.setFluoDetails(new FluoDetails( fluoPcjAppName )); + } + + final RyaDetails details = RyaDetails.builder() + // General Metadata + .setRyaInstanceName(instanceName) + .setRyaVersion( getVersion() ) + + // Secondary Index Values + .setGeoIndexDetails( + new GeoIndexDetails(installConfig.isGeoIndexEnabled())) + .setTemporalIndexDetails( + new TemporalIndexDetails(installConfig.isTemporalIndexEnabled())) + .setFreeTextDetails( + new FreeTextIndexDetails(installConfig.isFreeTextIndexEnabled())) + .setEntityCentricIndexDetails( + new EntityCentricIndexDetails(installConfig.isEntityCentrixIndexEnabled())) + .setPCJIndexDetails( pcjDetailsBuilder ) + + // Statistics values. + .setProspectorDetails( + new ProspectorDetails(Optional.<Date>absent()) ) + .setJoinSelectivityDetails( + new JoinSelectivityDetails(Optional.<Date>absent()) ) + .build(); + + // Initialize the table. + detailsRepo.initialize(details); + + return details; + } + + /** + * Builds a {@link AccumuloRdfConfiguration} object that will be used by the + * Rya DAO to initialize all of the tables it will need. + * + * @param connectionDetails - Indicates how to connect to Accumulo. (not null) + * @param details - Indicates what needs to be installed. (not null) + * @return A Rya Configuration object that can be used to perform the install. + */ + private static AccumuloRdfConfiguration makeRyaConfig(final AccumuloConnectionDetails connectionDetails, final RyaDetails details) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + // The Rya Instance Name is used as a prefix for the index tables in Accumulo. + conf.setTablePrefix( details.getRyaInstanceName() ); + + // Enable the indexers that the instance is configured to use. + // TODO fix me, not sure why the install command is here. +// conf.set(ConfigUtils.USE_GEO, "" + details.getGeoIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_FREETEXT, "" + details.getFreeTextIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_TEMPORAL, "" + details.getTemporalIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_ENTITY, "" + details.getEntityCentricIndexDetails().isEnabled()); + + conf.set(ConfigUtils.USE_PCJ, "" + details.getPCJIndexDetails().isEnabled() ); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString()); + + final Optional<FluoDetails> fluoHolder = details.getPCJIndexDetails().getFluoDetails(); + final PrecomputedJoinUpdaterType updaterType = fluoHolder.isPresent() ? PrecomputedJoinUpdaterType.FLUO : PrecomputedJoinUpdaterType.NO_UPDATE; + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, updaterType.toString()); + + // XXX The Accumulo implementation of the secondary indices make need all + // of the accumulo connector's parameters to initialize themselves, so + // we need to include them here. It would be nice if the secondary + // indexers used the connector that is provided to them instead of + // building a new one. + conf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); + + // This initializes the living indexers that will be used by the application and + // caches them within the configuration object so that they may be used later. + ConfigUtils.setIndexers(conf); + + return conf; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstanceExists.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstanceExists.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstanceExists.java new file mode 100644 index 0000000..1be7cd8 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloInstanceExists.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; + +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; + +/** + * An Accumulo implementation of the {@link InstanceExists} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloInstanceExists extends AccumuloCommand implements InstanceExists { + + /** + * Constructs an insatnce of {@link AccumuloInstanceExists}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloInstanceExists(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public boolean exists(final String instanceName) throws RyaClientException { + requireNonNull( instanceName ); + + final TableOperations tableOps = getConnector().tableOperations(); + + // Newer versions of Rya will have a Rya Details table. + final String ryaDetailsTableName = instanceName + AccumuloRyaInstanceDetailsRepository.INSTANCE_DETAILS_TABLE_NAME; + if(tableOps.exists(ryaDetailsTableName)) { + return true; + } + + // However, older versions only have the data tables. + final String spoTableName = instanceName + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + final String posTableName = instanceName + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; + final String ospTableName = instanceName + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; + if(tableOps.exists(spoTableName) && tableOps.exists(posTableName) && tableOps.exists(ospTableName)) { + return true; + } + + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListInstances.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListInstances.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListInstances.java new file mode 100644 index 0000000..86d96b8 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListInstances.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.ListInstances; +import mvm.rya.api.client.RyaClientException; + +/** + * An Accumulo implementation of the {@link ListInstances} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloListInstances extends AccumuloCommand implements ListInstances { + + private final Pattern spoPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + private final Pattern ospPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + private final Pattern poPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + + /** + * Constructs an instance of {@link AccumuloListInstances}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloListInstances(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public List<String> listInstances() throws RyaClientException { + // Figure out what the instance names might be. + final Map<String, InstanceTablesFound> proposedInstanceNames = new HashMap<>(); + + for(final String table : getConnector().tableOperations().list()) { + // SPO table + final Matcher spoMatcher = spoPattern.matcher(table); + if(spoMatcher.find()) { + final String instanceName = spoMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setSpoFound(); + } + + // OSP table + final Matcher ospMatcher = ospPattern.matcher(table); + if(ospMatcher.find()) { + final String instanceName = ospMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setOspFound(); + } + + // PO table + final Matcher poMatcher = poPattern.matcher(table); + if(poMatcher.find()) { + final String instanceName = poMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setPoFound(); + } + } + + // Determine which of the proposed names fit the expected Rya table structures. + final List<String> instanceNames = new ArrayList<>(); + for(final Entry<String, InstanceTablesFound> entry : proposedInstanceNames.entrySet()) { + final InstanceTablesFound tables = entry.getValue(); + if(tables.allFlagsSet()) { + instanceNames.add(entry.getKey()); + } + } + + return instanceNames; + } + + private InstanceTablesFound makeOrGetInstanceTables(final Map<String, InstanceTablesFound> lookup, final String instanceName) { + if(!lookup.containsKey(instanceName)) { + lookup.put(instanceName, new InstanceTablesFound()); + } + return lookup.get(instanceName); + } + + /** + * Flags that are used to determine if a String is a Rya Instance name. + */ + @ParametersAreNonnullByDefault + private static class InstanceTablesFound { + private boolean spoFound = false; + private boolean ospFound = false; + private boolean poFound = false; + + /** + * Sets the SPO table as seen. + */ + public void setSpoFound() { + spoFound = true; + } + + /** + * Sets the OSP table as seen. + */ + public void setOspFound() { + ospFound = true; + } + + /** + * Sets the POS table as seen. + */ + public void setPoFound() { + poFound = true; + } + + /** + * @return {@code true} if all of the flags have been set; otherwise {@code false}. + */ + public boolean allFlagsSet() { + return spoFound && ospFound && poFound; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java new file mode 100644 index 0000000..102f667 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import mvm.rya.api.client.RyaClient; + +/** + * Constructs instance of {@link RyaClient} that are connected to instance of + * Rya hosted by Accumulo clusters. + */ +@ParametersAreNonnullByDefault +public class AccumuloRyaClientFactory { + + /** + * Initialize a set of {@link RyaClient} that will interact with an instance of + * Rya that is hosted by an Accumulo cluster. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - The Accumulo connector the commands will use. (not null) + * @return The initialized commands. + */ + public static RyaClient build( + final AccumuloConnectionDetails connectionDetails, + final Connector connector) { + requireNonNull(connectionDetails); + requireNonNull(connector); + + // Build the RyaCommands option with the initialized commands. + return new RyaClient( + new AccumuloInstall(connectionDetails, connector), + new AccumuloCreatePCJ(connectionDetails, connector), + new AccumuloDeletePCJ(connectionDetails, connector), + new AccumuloBatchUpdatePCJ(connectionDetails, connector), + new AccumuloGetInstanceDetails(connectionDetails, connector), + new AccumuloInstanceExists(connectionDetails, connector), + new AccumuloListInstances(connectionDetails, connector)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/FluoClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/FluoClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/FluoClientFactory.java new file mode 100644 index 0000000..0c6c2f1 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/FluoClientFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.config.FluoConfiguration; + +/** + * Creates {@link FluoClient}s that are connected to a specific Fluo Application. + */ +@ParametersAreNonnullByDefault +public class FluoClientFactory { + + /** + * Create a {@link FluoClient} that uses the provided connection details. + * + * @param username - The username the connection will use. (not null) + * @param password - The password the connection will use. (not null) + * @param instanceName - The name of the Accumulo instance. (not null) + * @param zookeeperHostnames - A comma delimited list of the Zookeeper server hostnames. (not null) + * @param fluoAppName - The Fluo Application the client will be connected to. (not null) + * @return A {@link FluoClient} that may be used to access the Fluo Application. + */ + public FluoClient connect( + final String username, + final String password, + final String instanceName, + final String zookeeperHostnames, + final String fluoAppName) { + requireNonNull(username); + requireNonNull(password); + requireNonNull(instanceName); + requireNonNull(zookeeperHostnames); + requireNonNull(fluoAppName); + + final FluoConfiguration fluoConfig = new FluoConfiguration(); + + // Fluo configuration values. + fluoConfig.setApplicationName( fluoAppName ); + fluoConfig.setInstanceZookeepers( zookeeperHostnames + "/fluo" ); + + // Accumulo Connection Stuff. + fluoConfig.setAccumuloZookeepers( zookeeperHostnames ); + fluoConfig.setAccumuloInstance( instanceName ); + fluoConfig.setAccumuloUser( username ); + fluoConfig.setAccumuloPassword( password ); + + // Connect the client. + return FluoFactory.newClient(fluoConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/DocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/DocIdIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/DocIdIndexer.java new file mode 100644 index 0000000..21d5de7 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/DocIdIndexer.java @@ -0,0 +1,47 @@ +package mvm.rya.indexing; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +import mvm.rya.indexing.accumulo.entity.StarQuery; + +import org.apache.accumulo.core.client.TableNotFoundException; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +public interface DocIdIndexer extends Closeable { + + + + public abstract CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, + Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException; + + + + @Override + public abstract void close() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java new file mode 100644 index 0000000..d148b74 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java @@ -0,0 +1,339 @@ +package mvm.rya.indexing; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.commons.lang.Validate; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.And; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.collect.Lists; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.FreeTextTupleSet; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import mvm.rya.indexing.mongodb.temporal.MongoTemporalIndexer; + +public class FilterFunctionOptimizer implements QueryOptimizer, Configurable { + private static final Logger LOG = Logger.getLogger(FilterFunctionOptimizer.class); + private final ValueFactory valueFactory = new ValueFactoryImpl(); + + private Configuration conf; + private FreeTextIndexer freeTextIndexer; + private TemporalIndexer temporalIndexer; + private boolean init = false; + + public FilterFunctionOptimizer() { + } + + public FilterFunctionOptimizer(final AccumuloRdfConfiguration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, IOException, TableExistsException, NumberFormatException, UnknownHostException { + this.conf = conf; + init(); + } + + //setConf initializes FilterFunctionOptimizer so reflection can be used + //to create optimizer in RdfCloudTripleStoreConnection + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + //reset the init. + init = false; + init(); + } + + private synchronized void init() { + if (!init) { + if (ConfigUtils.getUseMongo(conf)) { + freeTextIndexer = new MongoFreeTextIndexer(); + freeTextIndexer.setConf(conf); + temporalIndexer = new MongoTemporalIndexer(); + temporalIndexer.setConf(conf); + } else { + freeTextIndexer = new AccumuloFreeTextIndexer(); + freeTextIndexer.setConf(conf); + temporalIndexer = new AccumuloTemporalIndexer(); + temporalIndexer.setConf(conf); + } + init = true; + } + } + + @Override + public void optimize(final TupleExpr tupleExpr, final Dataset dataset, final BindingSet bindings) { + // find variables used in property and resource based searches: + final SearchVarVisitor searchVars = new SearchVarVisitor(); + tupleExpr.visit(searchVars); + // rewrites for property searches: + processPropertySearches(tupleExpr, searchVars.searchProperties); + + } + + + + private void processPropertySearches(final TupleExpr tupleExpr, final Collection<Var> searchProperties) { + final MatchStatementVisitor matchStatements = new MatchStatementVisitor(searchProperties); + tupleExpr.visit(matchStatements); + for (final StatementPattern matchStatement: matchStatements.matchStatements) { + final Var subject = matchStatement.getSubjectVar(); + if (subject.hasValue() && !(subject.getValue() instanceof Resource)) { + throw new IllegalArgumentException("Query error: Found " + subject.getValue() + ", expected an URI or BNode"); + } + Validate.isTrue(subject.hasValue() || subject.getName() != null); + Validate.isTrue(!matchStatement.getObjectVar().hasValue() && matchStatement.getObjectVar().getName() != null); + buildQuery(tupleExpr, matchStatement); + } + } + + private void buildQuery(final TupleExpr tupleExpr, final StatementPattern matchStatement) { + //If our IndexerExpr (to be) is the rhs-child of LeftJoin, we can safely make that a Join: + // the IndexerExpr will (currently) not return results that can deliver unbound variables. + //This optimization should probably be generalized into a LeftJoin -> Join optimizer under certain conditions. Until that + // has been done, this code path at least takes care of queries generated by OpenSahara SparqTool that filter on OPTIONAL + // projections. E.g. summary~'full text search' (summary is optional). See #379 + if (matchStatement.getParentNode() instanceof LeftJoin) { + final LeftJoin leftJoin = (LeftJoin)matchStatement.getParentNode(); + if (leftJoin.getRightArg() == matchStatement && leftJoin.getCondition() == null) { + matchStatement.getParentNode().replaceWith(new Join(leftJoin.getLeftArg(), leftJoin.getRightArg())); + } + } + final FilterFunction fVisitor = new FilterFunction(matchStatement.getObjectVar().getName()); + tupleExpr.visit(fVisitor); + final List<IndexingExpr> results = Lists.newArrayList(); + for(int i = 0; i < fVisitor.func.size(); i++){ + results.add(new IndexingExpr(fVisitor.func.get(i), matchStatement, fVisitor.args.get(i))); + } + removeMatchedPattern(tupleExpr, matchStatement, new IndexerExprReplacer(results)); + } + + //find vars contained in filters + private static class SearchVarVisitor extends QueryModelVisitorBase<RuntimeException> { + private final Collection<Var> searchProperties = new ArrayList<Var>(); + + @Override + public void meet(final FunctionCall fn) { + final URI fun = new URIImpl(fn.getURI()); + final Var result = IndexingFunctionRegistry.getResultVarFromFunctionCall(fun, fn.getArgs()); + if (result != null && !searchProperties.contains(result)) { + searchProperties.add(result); + } + } + } + + //find StatementPatterns containing filter variables + private static class MatchStatementVisitor extends QueryModelVisitorBase<RuntimeException> { + private final Collection<Var> propertyVars; + private final Collection<Var> usedVars = new ArrayList<Var>(); + private final List<StatementPattern> matchStatements = new ArrayList<StatementPattern>(); + + public MatchStatementVisitor(final Collection<Var> propertyVars) { + this.propertyVars = propertyVars; + } + + @Override public void meet(final StatementPattern statement) { + final Var object = statement.getObjectVar(); + if (propertyVars.contains(object)) { + if (usedVars.contains(object)) { + throw new IllegalArgumentException("Illegal search, variable is used multiple times as object: " + object.getName()); + } else { + usedVars.add(object); + matchStatements.add(statement); + } + } + } + } + + private abstract class AbstractEnhanceVisitor extends QueryModelVisitorBase<RuntimeException> { + final String matchVar; + List<URI> func = Lists.newArrayList(); + List<Value[]> args = Lists.newArrayList(); + + public AbstractEnhanceVisitor(final String matchVar) { + this.matchVar = matchVar; + } + + protected void addFilter(final URI uri, final Value[] values) { + func.add(uri); + args.add(values); + } + } + + //create indexing expression for each filter matching var in filter StatementPattern + //replace old filter condition with true condition + private class FilterFunction extends AbstractEnhanceVisitor { + public FilterFunction(final String matchVar) { + super(matchVar); + } + + @Override + public void meet(final FunctionCall call) { + final URI fnUri = valueFactory.createURI(call.getURI()); + final Var resultVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(fnUri, call.getArgs()); + if (resultVar != null && resultVar.getName().equals(matchVar)) { + addFilter(valueFactory.createURI(call.getURI()), extractArguments(matchVar, call)); + if (call.getParentNode() instanceof Filter || call.getParentNode() instanceof And || call.getParentNode() instanceof LeftJoin) { + call.replaceWith(new ValueConstant(valueFactory.createLiteral(true))); + } else { + throw new IllegalArgumentException("Query error: Found " + call + " as part of an expression that is too complex"); + } + } + } + + private Value[] extractArguments(final String matchName, final FunctionCall call) { + final Value args[] = new Value[call.getArgs().size() - 1]; + int argI = 0; + for (int i = 0; i != call.getArgs().size(); ++i) { + final ValueExpr arg = call.getArgs().get(i); + if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { + continue; + } + if (arg instanceof ValueConstant) { + args[argI] = ((ValueConstant)arg).getValue(); + } else if (arg instanceof Var && ((Var)arg).hasValue()) { + args[argI] = ((Var)arg).getValue(); + } else { + throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI"); + } + ++argI; + } + return args; + } + + @Override + public void meet(final Filter filter) { + //First visit children, then condition (reverse of default): + filter.getArg().visit(this); + filter.getCondition().visit(this); + } + } + + private void removeMatchedPattern(final TupleExpr tupleExpr, final StatementPattern pattern, final TupleExprReplacer replacer) { + final List<TupleExpr> indexTuples = replacer.createReplacement(pattern); + if (indexTuples.size() > 1) { + final VarExchangeVisitor vev = new VarExchangeVisitor(pattern); + tupleExpr.visit(vev); + Join join = new Join(indexTuples.remove(0), indexTuples.remove(0)); + for (final TupleExpr geo : indexTuples) { + join = new Join(join, geo); + } + pattern.replaceWith(join); + } else if (indexTuples.size() == 1) { + pattern.replaceWith(indexTuples.get(0)); + pattern.setParentNode(null); + } else { + throw new IllegalStateException("Must have at least one replacement for matched StatementPattern."); + } + } + + private interface TupleExprReplacer { + List<TupleExpr> createReplacement(TupleExpr org); + } + + //replace each filter pertinent StatementPattern with corresponding index expr + private class IndexerExprReplacer implements TupleExprReplacer { + private final List<IndexingExpr> indxExpr; + private final FUNCTION_TYPE type; + + public IndexerExprReplacer(final List<IndexingExpr> indxExpr) { + this.indxExpr = indxExpr; + final URI func = indxExpr.get(0).getFunction(); + type = IndexingFunctionRegistry.getFunctionType(func); + } + + @Override + public List<TupleExpr> createReplacement(final TupleExpr org) { + final List<TupleExpr> indexTuples = Lists.newArrayList(); + switch (type) { + case FREETEXT: + for (final IndexingExpr indx : indxExpr) { + indexTuples.add(new FreeTextTupleSet(indx, freeTextIndexer)); + } + break; + case TEMPORAL: + for (final IndexingExpr indx : indxExpr) { + indexTuples.add(new TemporalTupleSet(indx, temporalIndexer)); + } + break; + default: + throw new IllegalArgumentException("Incorrect type!"); + } + return indexTuples; + } + } + + private static class VarExchangeVisitor extends QueryModelVisitorBase<RuntimeException> { + private final StatementPattern exchangeVar; + public VarExchangeVisitor(final StatementPattern sp) { + exchangeVar = sp; + } + + @Override + public void meet(final Join node) { + final QueryModelNode lNode = node.getLeftArg(); + if (lNode instanceof StatementPattern) { + exchangeVar.replaceWith(lNode); + node.setLeftArg(exchangeVar); + } else { + super.meet(node); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/FreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/FreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/FreeTextIndexer.java new file mode 100644 index 0000000..f6fb2c7 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/FreeTextIndexer.java @@ -0,0 +1,48 @@ +package mvm.rya.indexing; + +import java.io.IOException; + +import org.openrdf.model.Statement; +import org.openrdf.query.QueryEvaluationException; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +/** + * A repository to store, index, and retrieve {@link Statement}s based on freetext features. + */ +public interface FreeTextIndexer extends RyaSecondaryIndexer { + + /** + * Query the Free Text Index with specific constraints. A <code>null</code> or empty parameters imply no constraint. + * + * @param query + * the query to perform + * @param contraints + * the constraints on the statements returned + * @return the set of statements that meet the query and other constraints. + * @throws IOException + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementConstraints contraints) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/GeoConstants.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/GeoConstants.java b/extras/indexing/src/main/java/org/apache/rya/indexing/GeoConstants.java new file mode 100644 index 0000000..a692edd --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/GeoConstants.java @@ -0,0 +1,46 @@ +package mvm.rya.indexing; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; + +/** + * A set of URIs used in GeoSPARQL + */ +public class GeoConstants { + public static final String NS_GEO = "http://www.opengis.net/ont/geosparql#"; + public static final String NS_GEOF = "http://www.opengis.net/def/function/geosparql/"; + + public static final URI XMLSCHEMA_OGC_WKT = new URIImpl(NS_GEO + "wktLiteral"); + public static final URI GEO_AS_WKT = new URIImpl(NS_GEO + "asWKT"); + + public static final URI XMLSCHEMA_OGC_GML = new URIImpl(NS_GEO + "gmlLiteral"); + public static final URI GEO_AS_GML = new URIImpl(NS_GEO + "asGML"); + + public static final URI GEO_SF_EQUALS = new URIImpl(NS_GEOF + "sfEquals"); + public static final URI GEO_SF_DISJOINT = new URIImpl(NS_GEOF + "sfDisjoint"); + public static final URI GEO_SF_INTERSECTS = new URIImpl(NS_GEOF + "sfIntersects"); + public static final URI GEO_SF_TOUCHES = new URIImpl(NS_GEOF + "sfTouches"); + public static final URI GEO_SF_CROSSES = new URIImpl(NS_GEOF + "sfCrosses"); + public static final URI GEO_SF_WITHIN = new URIImpl(NS_GEOF + "sfWithin"); + public static final URI GEO_SF_CONTAINS = new URIImpl(NS_GEOF + "sfContains"); + public static final URI GEO_SF_OVERLAPS = new URIImpl(NS_GEOF + "sfOverlaps"); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ExternalIndexMatcher.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ExternalIndexMatcher.java b/extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ExternalIndexMatcher.java new file mode 100644 index 0000000..ee3d444 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ExternalIndexMatcher.java @@ -0,0 +1,34 @@ +package mvm.rya.indexing.IndexPlanValidator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.Iterator; + +import org.openrdf.query.algebra.TupleExpr; + +public interface ExternalIndexMatcher { + + + public Iterator<TupleExpr> getIndexedTuples(); + + + +}
