http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java deleted file mode 100644 index 311b745..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException; -import org.openrdf.model.URI; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; -import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory; - -/** - * Updates the state of the Precomputed Join indices that are used by Rya. - */ -@ParametersAreNonnullByDefault -public class PrecomputedJoinIndexer extends AbstractAccumuloIndexer { - private static final Logger log = Logger - .getLogger(PrecomputedJoinIndexer.class); - - /** - * This configuration object must be set before {@link #init()} is invoked. - * It is set by {@link #setConf(Configuration)}. - */ - private Optional<Configuration> conf = Optional.absent(); - - /** - * The Accumulo Connector that must be used when accessing an Accumulo - * storage. This value is provided by {@link #setConnector(Connector)}. - */ - private Optional<Connector> accumuloConn = Optional.absent(); - - /** - * Provides access to the {@link Configuration} that was provided to this - * class using {@link #setConf(Configuration)}. - */ - private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() { - @Override - public Configuration get() { - return getConf(); - } - }; - - /** - * Provides access to the Accumulo {@link Connector} that was provided to - * this class using {@link #setConnector(Connector)}. - */ - private final Supplier<Connector> accumuloSupplier = new Supplier<Connector>() { - @Override - public Connector get() { - return accumuloConn.get(); - } - }; - - /** - * Creates and grants access to the {@link PrecomputedJoinStorage} that will - * be used to interact with the PCJ results that are stored and used by Rya. - */ - private final PrecomputedJoinStorageSupplier pcjStorageSupplier = new PrecomputedJoinStorageSupplier( - configSupplier, new AccumuloPcjStorageSupplier(configSupplier, - accumuloSupplier)); - - private PrecomputedJoinStorage pcjStorage; - - /** - * Creates and grants access to the {@link PrecomputedJoinUpdater}s that will - * be used to update the state stored within the PCJ tables that are stored - * in Accumulo. - */ - private Supplier<PrecomputedJoinUpdater> updaterSupplier; - - - @Override - public void setConf(final Configuration conf) { - this.conf = Optional.fromNullable(conf); - } - - @Override - public Configuration getConf() { - return conf.get(); - } - - /** - * Set the connector that will be used by {@link AccumuloPcjStorage} if the - * application is configured to store the PCJs within Accumulo. - */ - @Override - public void setConnector(final Connector connector) { - checkNotNull(connector); - accumuloConn = Optional.of(connector); - } - - /** - * This is invoked when the host {@link RyaDAO#init()} method is invoked. - */ - @Override - public void init() { - pcjStorage = pcjStorageSupplier.get(); - updaterSupplier = new PcjUpdaterSupplierFactory(configSupplier).getSupplier(); - updaterSupplier.get(); - } - - @Override - public void storeStatement(final RyaStatement statement) throws IOException { - checkNotNull(statement); - storeStatements(Collections.singleton(statement)); - } - - @Override - public void storeStatements(final Collection<RyaStatement> statements) - throws IOException { - checkNotNull(statements); - try { - updaterSupplier.get().addStatements(statements); - } catch (final PcjUpdateException e) { - throw new IOException( - "Could not update the PCJs by adding the provided statements.", - e); - } - } - - @Override - public void deleteStatement(final RyaStatement statement) - throws IOException { - checkNotNull(statement); - try { - Collection<RyaStatement> statements = Collections.singleton(statement); - updaterSupplier.get().deleteStatements(statements); - } catch (final PcjUpdateException e) { - throw new IOException( - "Could not update the PCJs by removing the provided statement.", - e); - } - } - - @Override - public void flush() throws IOException { - try { - updaterSupplier.get().flush(); - } catch (final PcjUpdateException e) { - throw new IOException("Could not flush the PCJ Updater.", e); - } - } - - @Override - public void close() { - try { - pcjStorage.close(); - } catch (final PCJStorageException e) { - log.error("Could not close the PCJ Storage instance.", e); - } - - try { - updaterSupplier.get().close(); - } catch (final PcjUpdateException e) { - log.error("Could not close the PCJ Updater instance.", e); - } - } - - /** - * This is invoked when the host {@link RyaDAO#destroy()} method is invoked. - */ - @Override - public void destroy() { - close(); - } - - /** - * Deletes all data from the PCJ indices that are managed by a - * {@link PrecomputedJoinStorage}. - */ - @Override - public void purge(final RdfCloudTripleStoreConfiguration configuration) { - - try { - for (final String pcjId : pcjStorage.listPcjs()) { - try { - pcjStorage.purge(pcjId); - } catch (final PCJStorageException e) { - log.error( - "Could not purge the PCJ index with id: " + pcjId, - e); - } - } - } catch (final PCJStorageException e) { - log.error( - "Could not purge the PCJ indicies because they could not be listed.", - e); - } - } - - /** - * Deletes all of the PCJ indices that are managed by - * {@link PrecomputedJoinStorage}. - */ - @Override - public void dropAndDestroy() { - try { - for (final String pcjId : pcjStorage.listPcjs()) { - try { - pcjStorage.dropPcj(pcjId); - } catch (final PCJStorageException e) { - log.error("Could not delete the PCJ index with id: " - + pcjId, e); - } - } - } catch (final PCJStorageException e) { - log.error( - "Could not delete the PCJ indicies because they could not be listed.", - e); - } - } - - @Override - public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) - throws IOException { - // We do not need to use the writer that also writes to the core RYA - // tables. - } - - @Override - public void dropGraph(final RyaURI... graphs) { - log.warn("PCJ indices do not store Graph metadata, so graph results can not be dropped."); - } - - @Override - public String getTableName() { - // This method makes assumptions about how PCJs are stored. It's only - // used by AccumuloRyaDAO to purge data, so it should be replaced with - // a purge() method. - log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented."); - return null; - } - - @Override - public Set<URI> getIndexablePredicates() { - return new HashSet<>(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java deleted file mode 100644 index 3c76601..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external; - -import static com.google.common.base.Preconditions.checkNotNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import mvm.rya.api.persist.index.RyaSecondaryIndexer; -import mvm.rya.indexing.accumulo.ConfigUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Optional; - -/** - * Inspects the {@link Configuration} object that is provided to all instances - * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer} - * specific values. - */ -@ParametersAreNonnullByDefault -public class PrecomputedJoinIndexerConfig { - - /** - * Enumerates the different methodologies implemented to store the PCJ indices. - */ - public static enum PrecomputedJoinStorageType { - /** - * Stores each PCJ within an Accumulo table. - */ - ACCUMULO; - } - - /** - * Enumerates the different methodologies implemented to update the PCJ indices. - */ - public static enum PrecomputedJoinUpdaterType { - /** - * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered. - */ - FLUO, NO_UPDATE; - } - - // Indicates which implementation of PrecomputedJoinStorage to use. - public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; - - // Indicates which implementation of PrecomputedJoinUpdater to use. - public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; - public static final String USE_PCJ_FLUO_UPDATER = ConfigUtils.USE_PCJ_FLUO_UPDATER; - - // The configuration object that is provided to Secondary Indexing implementations. - private final Configuration config; - - /** - * Constructs an instance of {@link PrecomputedJoinIndexerConfig}. - * - * @param config - The {@link Configuration} object that is provided to - * all instance of {@link RyaSecondaryIndexer}. It will be inspected - * for {@link PrecomputedJoinIndexer} specific values. (not null) - */ - public PrecomputedJoinIndexerConfig(final Configuration config) { - this.config = checkNotNull(config); - } - - /** - * @return The type of {@link PrecomputedJoinStorage} to use. - */ - public Optional<PrecomputedJoinStorageType> getPcjStorageType() { - final String storageTypeString = config.get(PCJ_STORAGE_TYPE); - if(storageTypeString == null) { - return Optional.absent(); - } - - final PrecomputedJoinStorageType storageType = PrecomputedJoinStorageType.valueOf(storageTypeString); - return Optional.fromNullable(storageType); - } - - /** - * @return The type of {@link PrecomputedJoinUpdater} to use. - */ - public Optional<PrecomputedJoinUpdaterType> getPcjUpdaterType() { - final String updaterTypeString = config.get(PCJ_UPDATER_TYPE); - if(updaterTypeString == null) { - return Optional.absent(); - } - - final PrecomputedJoinUpdaterType updaterType = PrecomputedJoinUpdaterType.valueOf(updaterTypeString); - return Optional.fromNullable(updaterType); - } - - - - public boolean getUseFluoUpdater() { - return config.getBoolean(USE_PCJ_FLUO_UPDATER, false); - } - - - /** - * @return The configuration object that has been wrapped. - */ - public Configuration getConfig() { - return config; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java deleted file mode 100644 index d79989c..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Arrays; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; - -/** - * Creates an instance of {@link PrecomputedJoinStorage} based on the application's configuration. - */ -public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinStorage> { - - private final Supplier<Configuration> configSupplier; - private final AccumuloPcjStorageSupplier accumuloSupplier; - - /** - * Constructs an instance of {@link PrecomputedJoinStorageSupplier}. - * - * @param configSupplier - Provides access to the configuration of the - * application used to initialize the storage. (not null) - * @param accumuloSupplier - Used to create an Accumulo instance of the - * storage if that is the configured type. (not null) - */ - public PrecomputedJoinStorageSupplier( - final Supplier<Configuration> configSupplier, - final AccumuloPcjStorageSupplier accumuloSupplier) { - this.configSupplier = checkNotNull(configSupplier); - this.accumuloSupplier = checkNotNull(accumuloSupplier); - } - - @Override - public PrecomputedJoinStorage get() { - // Ensure a configuration has been set. - final Configuration config = configSupplier.get(); - checkNotNull(config, "Could not build the PrecomputedJoinStorage until the PrecomputedJoinIndexer has been configured."); - - final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); - - // Ensure the storage type has been set. - final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType(); - checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE + - "' property must have one of the following values: " + Arrays.toString(PrecomputedJoinStorageType.values())); - - // Create and return the configured storage. - switch(storageType.get()) { - case ACCUMULO: - return accumuloSupplier.get(); - - default: - throw new IllegalArgumentException("Unsupported PrecomputedJoinStorageType: " + storageType.get()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java deleted file mode 100644 index cabadb4..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; -import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier; - -/** - * Creates instance of {@link PrecomputedJoinUpdater} based on the application's configuration. - */ -public class PrecomputedJoinUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { - - private final Supplier<Configuration> configSupplier; - private final FluoPcjUpdaterSupplier fluoSupplier; - - /** - * Creates an instance of {@link PrecomputedJoinUpdaterSupplier}. - * - * @param configSupplier - Provides access to the configuration of the - * application used to initialize the updater. (not null) - * @param fluoSupplier - Used to create a Fluo instace of the updater - * if that is the configured type. (not null) - */ - public PrecomputedJoinUpdaterSupplier( - final Supplier<Configuration> configSupplier, - final FluoPcjUpdaterSupplier fluoSupplier) { - this.configSupplier = checkNotNull(configSupplier); - this.fluoSupplier = checkNotNull(fluoSupplier); - } - - @Override - public PrecomputedJoinUpdater get() { - // Ensure a configuration has been set. - final Configuration config = configSupplier.get(); - checkNotNull(config, "Can not build the PrecomputedJoinUpdater until the PrecomputedJoinIndexer has been configured."); - - final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); - - // Ensure an updater type has been set. - final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType(); - checkArgument(updaterType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE + - "' property must have one of the following values: " + PrecomputedJoinUpdaterType.values()); - - // Create and return the configured updater. - switch(updaterType.get()) { - case FLUO: - return fluoSupplier.get(); - - default: - throw new IllegalArgumentException("Unsupported PrecomputedJoinUpdaterType: " + updaterType.get()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java deleted file mode 100644 index 3f4806e..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.accumulo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; - -/** - * Configuration values required to initialize a {@link AccumuloPcjStorage}. - */ -public class AccumuloPcjStorageConfig { - - private final RdfCloudTripleStoreConfiguration config; - - /** - * Constructs an instance of {@link AccumuloPcjStorageConfig}. - * - * @param config - The configuration values that will be interpreted. (not null) - */ - public AccumuloPcjStorageConfig(final Configuration config) { - checkNotNull(config); - - // Wrapping the config with this class so that we can use it's getTablePrefix() method. - this.config = new RdfCloudTripleStoreConfiguration(config) { - @Override - public RdfCloudTripleStoreConfiguration clone() { - return null; - } - }; - } - - /** - * @return The Rya Instance name the storage grants access to. - */ - public String getRyaInstanceName() { - return config.getTablePrefix(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java deleted file mode 100644 index b6d4121..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.accumulo; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.accumulo.core.client.Connector; -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; - -/** - * Creates instances of {@link AccumuloPcjStorage} using the values found in a {@link Configuration}. - */ -public class AccumuloPcjStorageSupplier implements Supplier<AccumuloPcjStorage> { - - private final Supplier<Configuration> configSupplier; - private final Supplier<Connector> accumuloSupplier; - - /** - * Constructs an instance of {@link AccumuloPcjStorageSupplier}. - * - * @param configSupplier - Configures the {@link AccumuloPcjStorage} that is - * supplied by this class. (not null) - * @param accumuloSupplier - Provides the {@link Connector} that is used by - * the {@link AccumuloPcjStorage} that is supplied by this class. (not null) - */ - public AccumuloPcjStorageSupplier( - final Supplier<Configuration> configSupplier, - final Supplier<Connector> accumuloSupplier) { - this.configSupplier = checkNotNull(configSupplier); - this.accumuloSupplier = checkNotNull(accumuloSupplier); - } - - @Override - public AccumuloPcjStorage get() { - // Ensure a configuration has been set. - final Configuration config = configSupplier.get(); - checkNotNull(config, "Could not create a AccumuloPcjStorage because the application's configuration has not been provided yet."); - - // Ensure the correct storage type has been set. - final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); - - final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType(); - checkArgument(storageType.isPresent() && (storageType.get() == PrecomputedJoinStorageType.ACCUMULO), - "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE + - "' value be set to '" + PrecomputedJoinStorageType.ACCUMULO + "'."); - - // Ensure the Accumulo connector has been set. - final Connector accumuloConn = accumuloSupplier.get(); - checkNotNull(accumuloConn, "The Accumulo Connector must be set before initializing the AccumuloPcjStorage."); - - final String ryaInstanceName = new AccumuloPcjStorageConfig(config).getRyaInstanceName(); - return new AccumuloPcjStorage(accumuloConn, ryaInstanceName); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java deleted file mode 100644 index f53727d..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.client.FluoClient; -import mvm.rya.api.domain.RyaStatement; - -/** - * Updates the PCJ indices by forwarding the statement additions/removals to - * a Fluo application. - */ -@ParametersAreNonnullByDefault -public class FluoPcjUpdater implements PrecomputedJoinUpdater { - private static final Logger log = Logger.getLogger(FluoPcjUpdater.class); - - // Used to only print the unsupported delete operation once. - private boolean deleteWarningPrinted = false; - - private final FluoClient fluoClient; - private final InsertTriples insertTriples = new InsertTriples(); - private final String statementVis; - - /** - * Constructs an instance of {@link FluoPcjUpdater}. - * - * @param fluoClient - A connection to the Fluo table new statements will be - * inserted into and deleted from. (not null) - * @param statementVis - The visibility label that will be applied to all - * statements that are inserted via the Fluo PCJ updater. (not null) - */ - public FluoPcjUpdater(final FluoClient fluoClient, final String statementVis) { - this.fluoClient = checkNotNull(fluoClient); - this.statementVis = checkNotNull(statementVis); - } - - @Override - public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException { - insertTriples.insert(fluoClient, statements, Optional.of(statementVis)); - } - - @Override - public void deleteStatements(final Collection<RyaStatement> statements) throws PcjUpdateException { - // The Fluo application does not support statement deletion. - if(!deleteWarningPrinted) { - log.warn("The Fluo PCJ updating application does not support Statement deletion, " + - "but you are trying to use that feature. This may result in your PCJ index " + - "no longer reflecting the Statemetns that are stored in the core Rya tables."); - deleteWarningPrinted = true; - } - } - - @Override - public void flush() { - // The Fluo application does not do any batching, so this doesn't do anything. - } - - @Override - public void close() { - fluoClient.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java deleted file mode 100644 index 2a34a82..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import static com.google.common.base.Preconditions.checkNotNull; -import mvm.rya.indexing.accumulo.ConfigUtils; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Optional; - -/** - * Configuration values required to initialize a {@link FluoPcjUpdater}. - */ -public final class FluoPcjUpdaterConfig { - - // Defines which Fluo application is running for this instance of Rya. - public static final String FLUO_APP_NAME = ConfigUtils.FLUO_APP_NAME; - - // Values that define which Accumulo instance hosts the Fluo application's table. - public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS; - public static final String ACCUMULO_INSTANCE = ConfigUtils.CLOUDBASE_INSTANCE; - public static final String ACCUMULO_USERNAME = ConfigUtils.CLOUDBASE_USER; - public static final String ACCUMULO_PASSWORD = ConfigUtils.CLOUDBASE_PASSWORD; - - // Values that define the visibilities associated with statement that are inserted by the fluo updater. - public static final String STATEMENT_VISIBILITY = ConfigUtils.CLOUDBASE_AUTHS; - - // The configuration object that is provided to Secondary Indexing implementations. - private final Configuration config; - - /** - * Constructs an instance of {@link FluoPcjUpdaterConfig}. - * - * @param config - The configuration values that will be interpreted. (not null) - */ - public FluoPcjUpdaterConfig(final Configuration config) { - this.config = checkNotNull(config); - } - - /** - * @return The name of the Fluo Application this instance of RYA is - * using to incrementally update PCJs. - */ - public Optional<String> getFluoAppName() { - return Optional.fromNullable(config.get(FLUO_APP_NAME)); - } - - /** - * This value is the {@link #getAccumuloInstance()} value appended with the - * "/fluo" namespace. - * - * @return The zookeepers that are used to manage Fluo state. ({@code null} - * if not configured) - */ - public Optional<String> getFluoZookeepers() { - final Optional<String> accumuloZookeepers = getAccumuloZookeepers(); - if(!accumuloZookeepers.isPresent()) { - return Optional.absent(); - } - return Optional.of( accumuloZookeepers.get() + "/fluo" ); - } - - /** - * @return The zookeepers used to connect to the Accumulo instance that - * is storing the state of the Fluo Application. - */ - public Optional<String> getAccumuloZookeepers() { - return Optional.fromNullable(config.get(ACCUMULO_ZOOKEEPERS)); - } - - /** - * @return The instance name of the Accumulo instance that is storing - * the state of the Fluo Application. - */ - public Optional<String> getAccumuloInstance() { - return Optional.fromNullable(config.get(ACCUMULO_INSTANCE)); - } - - /** - * @return The username the indexer will authenticate when connecting - * to the Accumulo instance that stores the state of the Fluo Application. - */ - public Optional<String> getAccumuloUsername() { - return Optional.fromNullable(config.get(ACCUMULO_USERNAME)); - } - - /** - * @return The password the indexer will authenticate when connecting - * to the Accumulo instance that stores the state of the Fluo Application. - */ - public Optional<String> getAccumuloPassword() { - return Optional.fromNullable(config.get(ACCUMULO_PASSWORD)); - } - - /** - * @return The visibility labels that will be attached to the statements - * that are inserted into the Fluo Application. - */ - public Optional<String> getStatementVisibility() { - return Optional.fromNullable(config.get(STATEMENT_VISIBILITY)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java deleted file mode 100644 index c0895ba..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_INSTANCE; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_PASSWORD; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERNAME; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME; -import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.config.FluoConfiguration; - -import javax.annotation.ParametersAreNonnullByDefault; - -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -/** - * Creates instances of {@link FluoPcjUpdater} using the values found in a {@link Configuration}. - */ -@ParametersAreNonnullByDefault -public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { - - private final Supplier<Configuration> configSupplier; - - /** - * Constructs an instance of {@link FluoPcjUpdaterSupplier}. - * - * @param configSupplier - Configures the {@link FluoPcjUpdater} that is supplied by this class. (not null) - */ - public FluoPcjUpdaterSupplier(final Supplier<Configuration> configSupplier) { - this.configSupplier = checkNotNull(configSupplier); - } - - @Override - public FluoPcjUpdater get() { - final Configuration config = configSupplier.get(); - checkNotNull(config, "Could not create a FluoPcjUpdater because the application's configuration has not been provided yet."); - - // Ensure the correct updater type has been set. - final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); - - final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType(); - checkArgument(updaterType.isPresent() && updaterType.get() == PrecomputedJoinUpdaterType.FLUO, - "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE + - "' value be set to '" + PrecomputedJoinUpdaterType.FLUO + "'."); - - final FluoPcjUpdaterConfig fluoUpdaterConfig = new FluoPcjUpdaterConfig( indexerConfig.getConfig() ); - - // Make sure the required values are present. - checkArgument(fluoUpdaterConfig.getFluoAppName().isPresent(), "Missing configuration: " + FLUO_APP_NAME); - checkArgument(fluoUpdaterConfig.getFluoZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS); - checkArgument(fluoUpdaterConfig.getAccumuloZookeepers().isPresent(), "Missing configuration: " + ACCUMULO_ZOOKEEPERS); - checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + ACCUMULO_INSTANCE); - checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + ACCUMULO_USERNAME); - checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + ACCUMULO_PASSWORD); - checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + STATEMENT_VISIBILITY); - - // Fluo configuration values. - final FluoConfiguration fluoClientConfig = new FluoConfiguration(); - fluoClientConfig.setApplicationName( fluoUpdaterConfig.getFluoAppName().get() ); - fluoClientConfig.setInstanceZookeepers( fluoUpdaterConfig.getFluoZookeepers().get() ); - - // Accumulo Fluo Table configuration values. - fluoClientConfig.setAccumuloZookeepers( fluoUpdaterConfig.getAccumuloZookeepers().get() ); - fluoClientConfig.setAccumuloInstance( fluoUpdaterConfig.getAccumuloInstance().get() ); - fluoClientConfig.setAccumuloUser( fluoUpdaterConfig.getAccumuloUsername().get() ); - fluoClientConfig.setAccumuloPassword( fluoUpdaterConfig.getAccumuloPassword().get() ); - - final FluoClient fluoClient = FluoFactory.newClient(fluoClientConfig); - final String statementVisibilities = fluoUpdaterConfig.getStatementVisibility().get(); - return new FluoPcjUpdater(fluoClient, statementVisibilities); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java deleted file mode 100644 index 66a6b24..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import java.util.Collection; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.indexing.external.PrecomputedJoinIndexer; - -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -/** - * A NoOp updater (which does nothing) to be used by {@link PrecomputedJoinIndexer} if neither Batch nor - * {@link FluoPcjUpdater} is specified by the user to update Precomputed Joins. - * - */ -public class NoOpUpdater implements PrecomputedJoinUpdater { - - @Override - public void addStatements(Collection<RyaStatement> statements) - throws PcjUpdateException { - } - - @Override - public void deleteStatements(Collection<RyaStatement> statements) - throws PcjUpdateException { - } - - @Override - public void flush() throws PcjUpdateException { - } - - @Override - public void close() throws PcjUpdateException { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java deleted file mode 100644 index 6831353..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import mvm.rya.indexing.external.PrecomputedJoinIndexer; - -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Supplier; - -/** - * A {@link Supplier} for {@link NoOpUdater}s. This Supplier is used by - * {@link PrecomputedJoinIndexer} when no update strategy is specified by the user. - * - */ -public class NoOpUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { - - @Override - public NoOpUpdater get() { - return new NoOpUpdater(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java deleted file mode 100644 index 0250b6d..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.fluo; - -import mvm.rya.indexing.external.PrecomputedJoinIndexer; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; - -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; - -/** - * A factory for {@link Supplier}s used by {@link PrecomputedJoinIndexer} to - * get all update strategies for precomputed joins for a given Rya instance. - * - */ -public class PcjUpdaterSupplierFactory { - - private Supplier<Configuration> configSupplier; - - public PcjUpdaterSupplierFactory(Supplier<Configuration> configSupplier) { - this.configSupplier = configSupplier; - } - - public Supplier<PrecomputedJoinUpdater> getSupplier() { - - PrecomputedJoinIndexerConfig config = new PrecomputedJoinIndexerConfig(configSupplier.get()); - //TODO this should not be read from the config. Instead, - //this information should be retrieved from the RyaDetails table - if(config.getUseFluoUpdater()) { - return Suppliers.memoize(new FluoPcjUpdaterSupplier(configSupplier)); - } - else { - return Suppliers.memoize(new NoOpUpdaterSupplier()); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java deleted file mode 100644 index 04b6d2d..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java +++ /dev/null @@ -1,604 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.tupleSet; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.model.Value; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.impl.BindingImpl; -import org.openrdf.query.parser.ParsedTupleQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.sail.SailException; - -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -import info.aduna.iteration.CloseableIteration; -import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator; -import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator.HashJoinType; -import mvm.rya.accumulo.pcj.iterators.IteratorCombiner; -import mvm.rya.accumulo.pcj.iterators.PCJKeyToCrossProductBindingSetIterator; -import mvm.rya.accumulo.pcj.iterators.PCJKeyToJoinBindingSetIterator; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.utils.IteratorWrapper; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.pcj.matching.PCJOptimizerUtilities; -import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; - -/** - * During query planning, this node is inserted into the parsed query to - * represent part of the original query (a sub-query). This sub-query is the - * value returned by {@link ExternalTupleSet#getTupleExpr()}. The results - * associated with this sub-query are stored in an external Accumulo table, - * where accCon and tablename are the associated {@link Connector} and table - * name. During evaluation, the portion of the query in {@link AccumuloIndexSet} - * is evaluated by scanning the external Accumulo table. This class is extremely - * useful for caching queries and reusing results from previous SPARQL queries. - * <p> - * - * The the {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()} - * may have different variables than the query and variables stored in the - * external Accumulo table. The mapping of variables from the TupleExpr to the - * table variables are given by {@link ExternalTupleSet#getTableVarMap()}. In - * addition to allowing the variables to differ, it is possible for TupleExpr to - * have fewer variables than the table query--that is, some of the variables in - * the table query may appear as constants in the TupleExpr. Theses expression - * are extracted from TupleExpr by the methods - * {@link AccumuloIndexSet#getConstantConstraints()} and by the Visitor - * {@link ValueMapVisitor} to be used as constraints when scanning the Accumulo - * table. This allows for the pre-computed results to be used for a larger class - * of sub-queries. - * - */ -public class AccumuloIndexSet extends ExternalTupleSet implements - ExternalBatchingIterator { - - private final Connector accCon; // connector to Accumulo table where results - // are stored - private final String tablename; // name of Accumulo table - private List<String> varOrder = null; // orders in which results are written - // to table - private final PcjTables pcj = new PcjTables(); - private final Authorizations auths; - - - @Override - public Map<String, Set<String>> getSupportedVariableOrders() { - return this.getSupportedVariableOrderMap(); - } - - @Override - public String getSignature() { - return "AccumuloIndexSet(" + tablename + ") : " - + Joiner.on(", ").join(this.getTupleExpr().getBindingNames()); - } - - /** - * - * @param sparql - * - name of sparql query whose results will be stored in PCJ - * table - * @param accCon - * - connection to a valid Accumulo instance - * @param tablename - * - name of an existing PCJ table - * @throws MalformedQueryException - * @throws SailException - * @throws QueryEvaluationException - * @throws TableNotFoundException - * @throws AccumuloSecurityException - * @throws AccumuloException - * @throws PCJStorageException - */ - public AccumuloIndexSet(final String sparql, final Configuration conf, - final String tablename) throws MalformedQueryException, SailException, - QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException, PCJStorageException { - this.tablename = tablename; - this.accCon = ConfigUtils.getConnector(conf); - this.auths = getAuthorizations(conf); - final SPARQLParser sp = new SPARQLParser(); - final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); - final TupleExpr te = pq.getTupleExpr(); - Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), - "TupleExpr is an invalid PCJ."); - - final Optional<Projection> projection = new ParsedQueryUtil() - .findProjection(pq); - if (!projection.isPresent()) { - throw new MalformedQueryException("SPARQL query '" + sparql - + "' does not contain a Projection."); - } - setProjectionExpr(projection.get()); - Set<VariableOrder> orders = null; - orders = pcj.getPcjMetadata(accCon, tablename).getVarOrders(); - - varOrder = Lists.newArrayList(); - for (final VariableOrder var : orders) { - varOrder.add(var.toString()); - } - setLocalityGroups(tablename, accCon, varOrder); - this.setSupportedVariableOrderMap(varOrder); - } - - /** - * - * @param accCon - * - connection to a valid Accumulo instance - * @param tablename - * - name of an existing PCJ table - * @throws MalformedQueryException - * @throws SailException - * @throws QueryEvaluationException - * @throws TableNotFoundException - * @throws AccumuloSecurityException - * @throws AccumuloException - */ - public AccumuloIndexSet(final Configuration conf, final String tablename) - throws MalformedQueryException, SailException, - QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException { - this.accCon = ConfigUtils.getConnector(conf); - this.auths = getAuthorizations(conf); - PcjMetadata meta = null; - try { - meta = pcj.getPcjMetadata(accCon, tablename); - } catch (final PcjException e) { - e.printStackTrace(); - } - - this.tablename = tablename; - final SPARQLParser sp = new SPARQLParser(); - final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(meta.getSparql(), null); - - setProjectionExpr((Projection) pq.getTupleExpr()); - final Set<VariableOrder> orders = meta.getVarOrders(); - - varOrder = Lists.newArrayList(); - for (final VariableOrder var : orders) { - varOrder.add(var.toString()); - } - setLocalityGroups(tablename, accCon, varOrder); - this.setSupportedVariableOrderMap(varOrder); - } - - - private Authorizations getAuthorizations(final Configuration conf) { - final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, ""); - if (authString.isEmpty()) { - return new Authorizations(); - } - return new Authorizations(authString.split(",")); - } - - /** - * returns size of table for query planning - */ - @Override - public double cardinality() { - double cardinality = 0; - try { - cardinality = pcj.getPcjMetadata(accCon, tablename) - .getCardinality(); - } catch (final PcjException e) { - e.printStackTrace(); - } - return cardinality; - } - - /** - * - * @param tableName - * @param conn - * @param groups - * - locality groups to be created - * - * Sets locality groups for more efficient scans - these are - * usually the variable orders in the table so that scans for - * specific orders are more efficient - */ - private void setLocalityGroups(final String tableName, final Connector conn, - final List<String> groups) { - - final HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); - for (int i = 0; i < groups.size(); i++) { - final HashSet<Text> tempColumn = new HashSet<Text>(); - tempColumn.add(new Text(groups.get(i))); - final String groupName = groups.get(i).replace(VALUE_DELIM, ""); - localityGroups.put(groupName, tempColumn); - } - - try { - conn.tableOperations().setLocalityGroups(tableName, localityGroups); - } catch (AccumuloException | AccumuloSecurityException - | TableNotFoundException e) { - e.printStackTrace(); - } - - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( - final BindingSet bindingset) throws QueryEvaluationException { - return this.evaluate(Collections.singleton(bindingset)); - } - - /** - * Core evaluation method used during query evaluation - given a collection - * of binding set constraints, this method finds common binding labels - * between the constraints and table, uses those to build a prefix scan of - * the Accumulo table, and creates a solution binding set by iterating of - * the scan results. - * @param bindingset - collection of {@link BindingSet}s to be joined with PCJ - * @return - CloseableIteration over joined results - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( - final Collection<BindingSet> bindingset) - throws QueryEvaluationException { - - if (bindingset.isEmpty()) { - return new IteratorWrapper<BindingSet, QueryEvaluationException>( - new HashSet<BindingSet>().iterator()); - } - - final List<BindingSet> crossProductBs = new ArrayList<>(); - final Map<String, org.openrdf.model.Value> constantConstraints = new HashMap<>(); - final Set<Range> hashJoinRanges = new HashSet<>(); - final Range EMPTY_RANGE = new Range("", true, "~", false); - Range crossProductRange = EMPTY_RANGE; - String localityGroupOrder = varOrder.get(0); - int maxPrefixLen = Integer.MIN_VALUE; - int prefixLen = 0; - int oldPrefixLen = 0; - final Multimap<String, BindingSet> bindingSetHashMap = HashMultimap.create(); - HashJoinType joinType = HashJoinType.CONSTANT_JOIN_VAR; - final Set<String> unAssuredVariables = Sets.difference(getTupleExpr().getBindingNames(), getTupleExpr().getAssuredBindingNames()); - boolean useColumnScan = false; - boolean isCrossProd = false; - boolean containsConstantConstraints = false; - final BindingSet constants = getConstantConstraints(); - containsConstantConstraints = constants.size() > 0; - - try { - for (final BindingSet bs : bindingset) { - if (bindingset.size() == 1 && bs.size() == 0) { - // in this case, only single, empty bindingset, pcj node is - // first node in query plan - use full Range scan with - // column - // family set - useColumnScan = true; - } - // get common vars for PCJ - only use variables associated - // with assured Bindings - final QueryBindingSet commonVars = new QueryBindingSet(); - for (final String b : getTupleExpr().getAssuredBindingNames()) { - final Binding v = bs.getBinding(b); - if (v != null) { - commonVars.addBinding(v); - } - } - // no common vars implies cross product - if (commonVars.size() == 0 && bs.size() != 0) { - crossProductBs.add(bs); - isCrossProd = true; - } - //get a varOrder from orders in PCJ table - use at least - //one common variable - final BindingSetVariableOrder varOrder = getVarOrder( - commonVars.getBindingNames(), - constants.getBindingNames()); - - // update constant constraints not used in varOrder and - // update Bindings used to form range by removing unused - // variables - commonVars.addAll(constants); - if (commonVars.size() > varOrder.varOrderLen) { - final Map<String, Value> valMap = getConstantValueMap(); - for (final String s : new HashSet<String>(varOrder.unusedVars)) { - if (valMap.containsKey(s) - && !constantConstraints.containsKey(s)) { - constantConstraints.put(s, valMap.get(s)); - } - commonVars.removeBinding(s); - } - } - - if (containsConstantConstraints - && (useColumnScan || isCrossProd)) { - // only one range required in event of a cross product or - // empty BindingSet - // Range will either be full table Range or determined by - // constant constraints - if (crossProductRange == EMPTY_RANGE) { - crossProductRange = getRange(varOrder.varOrder, - commonVars); - localityGroupOrder = prefixToOrder(varOrder.varOrder); - } - } else if (!useColumnScan && !isCrossProd) { - // update ranges and add BindingSet to HashJoinMap if not a - // cross product - hashJoinRanges.add(getRange(varOrder.varOrder, commonVars)); - - prefixLen = varOrder.varOrderLen; - // check if common Variable Orders are changing between - // BindingSets (happens in case - // of Optional). If common variable set length changes from - // BindingSet to BindingSet - // update the HashJoinType to be VARIABLE_JOIN_VAR. - if (oldPrefixLen == 0) { - oldPrefixLen = prefixLen; - } else { - if (oldPrefixLen != prefixLen - && joinType == HashJoinType.CONSTANT_JOIN_VAR) { - joinType = HashJoinType.VARIABLE_JOIN_VAR; - } - oldPrefixLen = prefixLen; - } - // update max prefix len - if (prefixLen > maxPrefixLen) { - maxPrefixLen = prefixLen; - } - final String key = getHashJoinKey(varOrder.varOrder, commonVars); - bindingSetHashMap.put(key, bs); - } - - isCrossProd = false; - } - - // create full Range scan iterator and set column family if empty - // collection or if cross product BindingSet exists and no hash join - // BindingSets - if ((useColumnScan || crossProductBs.size() > 0) - && bindingSetHashMap.size() == 0) { - final Scanner scanner = accCon.createScanner(tablename, auths); - // cross product with no cross product constraints here - scanner.setRange(crossProductRange); - scanner.fetchColumnFamily(new Text(localityGroupOrder)); - return new PCJKeyToCrossProductBindingSetIterator(scanner, - crossProductBs, constantConstraints, unAssuredVariables, getTableVarMap()); - } else if ((useColumnScan || crossProductBs.size() > 0) - && bindingSetHashMap.size() > 0) { - - // in this case, both hash join BindingSets and cross product - // BindingSets exist - // create an iterator to evaluate cross product and an iterator - // for hash join, then combine - - final List<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorList = new ArrayList<>(); - - // create cross product iterator - final Scanner scanner1 = accCon.createScanner(tablename, auths); - scanner1.setRange(crossProductRange); - scanner1.fetchColumnFamily(new Text(localityGroupOrder)); - iteratorList.add(new PCJKeyToCrossProductBindingSetIterator( - scanner1, crossProductBs, constantConstraints, unAssuredVariables, - getTableVarMap())); - - // create hash join iterator - final BatchScanner scanner2 = accCon.createBatchScanner(tablename, auths, 10); - scanner2.setRanges(hashJoinRanges); - final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( - scanner2, getTableVarMap(), maxPrefixLen); - iteratorList.add(new BindingSetHashJoinIterator( - bindingSetHashMap, iterator, unAssuredVariables, joinType)); - - // combine iterators - return new IteratorCombiner(iteratorList); - - } else { - // only hash join BindingSets exist - final BatchScanner scanner = accCon.createBatchScanner(tablename, auths, 10); - // only need to create hash join iterator - scanner.setRanges(hashJoinRanges); - final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( - scanner, getTableVarMap(), maxPrefixLen); - return new BindingSetHashJoinIterator(bindingSetHashMap, - iterator, unAssuredVariables, joinType); - } - } catch (final Exception e) { - throw new QueryEvaluationException(e); - } - } - - private String getHashJoinKey(final String commonVarOrder, final BindingSet bs) { - final String[] commonVarArray = commonVarOrder.split(VAR_ORDER_DELIM); - String key = bs.getValue(commonVarArray[0]).toString(); - for (int i = 1; i < commonVarArray.length; i++) { - key = key + VALUE_DELIM + bs.getValue(commonVarArray[i]).toString(); - } - return key; - } - - private Range getRange(final String commonVarOrder, final BindingSet bs) - throws BindingSetConversionException { - final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - byte[] rangePrefix = new byte[0]; - rangePrefix = converter.convert(bs, new VariableOrder(commonVarOrder)); - return Range.prefix(new Text(rangePrefix)); - } - - /** - * - * @param variableBindingNames - * - names corresponding to variables - * @param constantBindingNames - * - names corresponding to constant constraints - * @return - {@link BindingSetVariableOrder} object containing largest - * possible supported variable order built from variableBindingNames - * and constantBindingNames - */ - private BindingSetVariableOrder getVarOrder( - final Set<String> variableBindingNames, final Set<String> constantBindingNames) { - final Map<String, Set<String>> varOrderMap = this - .getSupportedVariableOrders(); - final Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); - - Set<String> variables; - if (variableBindingNames.size() == 0 - && constantBindingNames.size() == 0) { - return new BindingSetVariableOrder("", 0, new HashSet<String>()); - } else if (variableBindingNames.size() > 0 - && constantBindingNames.size() == 0) { - variables = variableBindingNames; - } else if (variableBindingNames.size() == 0 - && constantBindingNames.size() > 0) { - variables = constantBindingNames; - } else { - variables = Sets.union(variableBindingNames, constantBindingNames); - - String maxPrefix = null; - int maxPrefixLen = 0; - Set<String> minUnusedVariables = null; - - for (final Map.Entry<String, Set<String>> e : entries) { - final Set<String> value = e.getValue(); - if (maxPrefixLen < value.size() - && variables.containsAll(value) - && Sets.intersection(value, variableBindingNames) - .size() > 0) { - maxPrefixLen = value.size(); - maxPrefix = e.getKey(); - minUnusedVariables = Sets.difference(variables, value); - if (maxPrefixLen == variables.size()) { - break; - } - } - } - return new BindingSetVariableOrder(maxPrefix, maxPrefixLen, - minUnusedVariables); - } - - String maxPrefix = null; - int maxPrefixLen = 0; - Set<String> minUnusedVariables = null; - - for (final Map.Entry<String, Set<String>> e : entries) { - final Set<String> value = e.getValue(); - if (maxPrefixLen < value.size() && variables.containsAll(value)) { - maxPrefixLen = value.size(); - maxPrefix = e.getKey(); - minUnusedVariables = Sets.difference(variables, value); - if (maxPrefixLen == variables.size()) { - break; - } - } - } - return new BindingSetVariableOrder(maxPrefix, maxPrefixLen, - minUnusedVariables); - } - - /** - * @return - all constraints which correspond to variables in - * {@link AccumuloIndexSet#getTupleExpr()} which are set equal to a - * constant, but are non-constant in Accumulo table - */ - private BindingSet getConstantConstraints() { - final Map<String, String> tableMap = this.getTableVarMap(); - final Set<String> keys = tableMap.keySet(); - - final QueryBindingSet constants = new QueryBindingSet(); - for (final String s : keys) { - if (s.startsWith("-const-")) { - constants.addBinding(new BindingImpl(s, getConstantValueMap() - .get(s))); - } - } - return constants; - } - - /** - * - * @param order - prefix of a full variable order - * @return - full variable order that includes all variables whose values - * are stored in the table - used to obtain the locality group - */ - //given partial order of query vars, convert to PCJ vars and determine - //if converted partial order is a substring of a full var order of PCJ variables. - //if converted partial order is a prefix, convert corresponding full PCJ var order to query vars - private String prefixToOrder(String order) { - final Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse(); - final String[] temp = order.split(VAR_ORDER_DELIM); - //get order in terms of PCJ variables - for (int i = 0; i < temp.length; i++) { - temp[i] = this.getTableVarMap().get(temp[i]); - } - order = Joiner.on(VAR_ORDER_DELIM).join(temp); - for (final String s : varOrder) { - //verify that partial order is prefix of a PCJ varOrder - if (s.startsWith(order)) { - return s; - } - } - throw new NoSuchElementException("Order is not a prefix of any locality group value!"); - } - - - private class BindingSetVariableOrder { - - Set<String> unusedVars; - int varOrderLen = 0; - String varOrder; - - public BindingSetVariableOrder(final String varOrder, final int len, - final Set<String> unused) { - this.varOrder = varOrder; - this.varOrderLen = len; - this.unusedVars = unused; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java deleted file mode 100644 index b53dd66..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java +++ /dev/null @@ -1,309 +0,0 @@ -package mvm.rya.indexing.external.tupleSet; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.evaluation.impl.ExternalSet; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; - -import com.beust.jcommander.internal.Sets; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * This is an abstract class of delegating the evaluation of part - * of a SPARQL query to an external source. The {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()} - * represents the SPARQL string that this node evaluates, and table returned by {@link ExternalTupleSet#getTableVarMap()} - * maps the variables of TupleExpr to the variables stored in the external store (which may be different). The map - * returned by {@link ExternalTupleSet#getSupportedVariableOrderMap()} provides a map of all the variable orders in which - * data is written to the supporting, and is useful for determining which {@link BindingSet} can be passed into - * {@link ExternalTupleSet#evaluate(BindingSet)}. - * - */ -public abstract class ExternalTupleSet extends ExternalSet { - - public static final String VAR_ORDER_DELIM = ";"; - public static final String CONST_PREFIX = "-const-"; - public static final String VALUE_DELIM = "\u0000"; - private Projection tupleExpr; - private Map<String, String> tableVarMap = Maps.newHashMap(); //maps vars in tupleExpr to var in stored binding sets - private Map<String, Set<String>> supportedVarOrders = Maps.newHashMap(); //indicates supported var orders - private Map<String, org.openrdf.model.Value> valMap; - - public ExternalTupleSet() { - } - - public ExternalTupleSet(Projection tupleExpr) { - Preconditions.checkNotNull(tupleExpr); - this.tupleExpr = tupleExpr; - valMap = getValMap(); - updateTableVarMap(tupleExpr, tupleExpr); - } - - @Override - abstract public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) throws QueryEvaluationException; - - @Override - public Set<String> getBindingNames() { - return tupleExpr.getBindingNames(); - } - - @Override - public Set<String> getAssuredBindingNames() { - return tupleExpr.getAssuredBindingNames(); - } - - @Override - public String getSignature() { - return "(External Projection) " + Joiner.on(", ").join(tupleExpr.getProjectionElemList().getElements()).replaceAll("\\s+", " "); - } - - public Projection getTupleExpr() { - return tupleExpr; - } - - public void setProjectionExpr(Projection tupleExpr) { - Preconditions.checkNotNull(tupleExpr); - if(this.tupleExpr == null) { - updateTableVarMap(tupleExpr, tupleExpr); - } else { - updateTableVarMap(tupleExpr, this.tupleExpr); - } - this.tupleExpr = tupleExpr; - valMap = getValMap(); - if (supportedVarOrders.size() != 0) { - updateSupportedVarOrderMap(); - } - } - - public void setTableVarMap(Map<String,String> vars) { - Preconditions.checkNotNull(vars); - this.tableVarMap = vars; - } - - public Map<String, String> getTableVarMap() { - return this.tableVarMap; - } - - public void setSupportedVariableOrderMap(Map<String, Set<String>> varOrders) { - Preconditions.checkNotNull(varOrders); - this.supportedVarOrders = varOrders; - } - - public void setSupportedVariableOrderMap(List<String> varOrders) { - Preconditions.checkNotNull(varOrders); - this.supportedVarOrders = createSupportedVarOrderMap(varOrders); - } - - public Map<String, Set<String>> getSupportedVariableOrderMap() { - return supportedVarOrders; - } - - public Map<String, org.openrdf.model.Value> getConstantValueMap() { - return valMap; - } - - @Override - public ExternalSet clone() { - final ExternalTupleSet clone = (ExternalTupleSet) super.clone(); - clone.setProjectionExpr(this.tupleExpr.clone()); - clone.tableVarMap = Maps.newHashMap(); - for(final String s: this.tableVarMap.keySet()) { - clone.tableVarMap.put(s,this.tableVarMap.get(s)); - } - clone.supportedVarOrders = Maps.newHashMap(); - for(final String s: this.supportedVarOrders.keySet()) { - clone.supportedVarOrders.put(s,this.supportedVarOrders.get(s)); - } - return clone; - } - - public Map<String, Set<String>> getSupportedVariableOrders() { - return supportedVarOrders; - } - - public boolean supportsBindingSet(Set<String> bindingNames) { - final Collection<Set<String>> values = supportedVarOrders.values(); - final Set<String> bNames = Sets.newHashSet(); - final Set<String> bNamesWithConstants = Sets.newHashSet(); - - for (final String s : this.getTupleExpr().getBindingNames()) { - if (bindingNames.contains(s)) { - bNames.add(s); - bNamesWithConstants.add(s); - } else if(s.startsWith(CONST_PREFIX)) { - bNamesWithConstants.add(s); - } - } - return values.contains(bNames) || values.contains(bNamesWithConstants); - } - - /** - * @param tupleMatch - * - project expression - call after setting {@link tupleExpr} to - * map new variables to old -- the variables in the binding list - * of the new tupleExpr (tupleMatch) must map to the - * corresponding variables in the binding list of the old - * tupleExpr - */ - private void updateTableVarMap(TupleExpr newTuple, TupleExpr oldTuple) { - - final List<String> replacementVars = Lists.newArrayList(newTuple - .getBindingNames()); - final List<String> tableVars = Lists.newArrayList(oldTuple - .getBindingNames()); - - final Map<String, String> tableMap = Maps.newHashMap(); - - for (int i = 0; i < tableVars.size(); i++) { - tableMap.put(replacementVars.get(i), tableVars.get(i)); - } - this.setTableVarMap(tableMap); - } - - /** - * call after setting {@link tableVarMap} to update map of supported - * variables in terms of variables in new tupleExpr - */ - private void updateSupportedVarOrderMap() { - - Preconditions.checkArgument(supportedVarOrders.size() != 0);; - final Map<String, Set<String>> newSupportedOrders = Maps.newHashMap(); - final BiMap<String, String> biMap = HashBiMap.create(tableVarMap) - .inverse(); - Set<String> temp = null; - final Set<String> keys = supportedVarOrders.keySet(); - - for (final String s : keys) { - temp = supportedVarOrders.get(s); - final Set<String> newSet = Sets.newHashSet(); - - for (final String t : temp) { - newSet.add(biMap.get(t)); - } - - final String[] tempStrings = s.split(VAR_ORDER_DELIM); - String v = ""; - for (final String u : tempStrings) { - if (v.length() == 0) { - v = v + biMap.get(u); - } else { - v = v + VAR_ORDER_DELIM + biMap.get(u); - } - } - newSupportedOrders.put(v, newSet); - } - supportedVarOrders = newSupportedOrders; - } - - /** - * - * @param orders - * @return - map with all possible orders in which results are written to the table - */ - private Map<String, Set<String>> createSupportedVarOrderMap(List<String> orders) { - final Map<String, Set<String>> supportedVars = Maps.newHashMap(); - - for (final String t : orders) { - final String[] tempOrder = t.split(VAR_ORDER_DELIM); - final Set<String> varSet = Sets.newHashSet(); - String u = ""; - for (final String s : tempOrder) { - if(u.length() == 0) { - u = s; - } else{ - u = u+ VAR_ORDER_DELIM + s; - } - varSet.add(s); - supportedVars.put(u, new HashSet<String>(varSet)); - } - } - return supportedVars; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ExternalTupleSet)) { - return false; - } else { - final ExternalTupleSet arg = (ExternalTupleSet) other; - if (this.getTupleExpr().equals(arg.getTupleExpr())) { - return true; - } else { - return false; - } - } - } - - @Override - public int hashCode() { - int result = 17; - result = 31*result + tupleExpr.hashCode(); - return result; - } - - private Map<String, org.openrdf.model.Value> getValMap() { - ValueMapVisitor valMapVis = new ValueMapVisitor(); - tupleExpr.visit(valMapVis); - return valMapVis.getValMap(); - } - - - /** - * - * Extracts the values associated with constant labels in the query Used to - * create binding sets from range scan - */ - private class ValueMapVisitor extends - QueryModelVisitorBase<RuntimeException> { - Map<String, org.openrdf.model.Value> valMap = Maps.newHashMap(); - - public Map<String, org.openrdf.model.Value> getValMap() { - return valMap; - } - - @Override - public void meet(Var node) { - if (node.getName().startsWith("-const-")) { - valMap.put(node.getName(), node.getValue()); - } - } - } - - -}
