RYA-105: Fluo Integration
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d9aaca79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d9aaca79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d9aaca79 Branch: refs/heads/master Commit: d9aaca79cc1c69ca049c794f8b35e72e877e808e Parents: dbd46e7 Author: Caleb Meier <[email protected]> Authored: Wed Jul 6 12:26:38 2016 -0400 Committer: pujav65 <[email protected]> Committed: Thu Jul 21 09:09:39 2016 -0400 ---------------------------------------------------------------------- .../java/mvm/rya/accumulo/AccumuloRyaDAO.java | 62 ++-- .../mvm/rya/indexing/accumulo/ConfigUtils.java | 48 ++- .../external/PrecomputedJoinIndexer.java | 130 ++++--- .../external/PrecomputedJoinIndexerConfig.java | 15 +- .../external/fluo/FluoPcjUpdaterConfig.java | 5 +- .../external/fluo/FluoPcjUpdaterSupplier.java | 17 +- .../rya/indexing/external/fluo/NoOpUpdater.java | 53 +++ .../external/fluo/NoOpUpdaterSupplier.java | 39 +++ .../fluo/PcjUpdaterSupplierFactory.java | 57 +++ .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 47 ++- .../apache/rya/indexing/pcj/fluo/ITBase.java | 345 +++++++++++-------- .../pcj/fluo/api/CountStatementsIT.java | 23 +- .../pcj/fluo/integration/RyaExportIT.java | 16 +- .../RyaInputIncrementalUpdateIT.java | 200 +++++++++++ .../pcj/fluo/visibility/PcjVisibilityIT.java | 18 +- 15 files changed, 776 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java index b10c522..ed991e1 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java @@ -8,9 +8,9 @@ package mvm.rya.accumulo; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,6 +31,7 @@ import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; +import info.aduna.iteration.CloseableIteration; import java.io.IOException; import java.util.Collection; @@ -40,6 +41,18 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.resolver.RyaTripleContext; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchDeleter; @@ -64,21 +77,6 @@ import org.openrdf.model.Namespace; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import info.aduna.iteration.CloseableIteration; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.RyaNamespaceManager; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); @@ -111,8 +109,9 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName @Override public void init() throws RyaDAOException { - if (initialized) + if (initialized) { return; + } try { checkNotNull(conf); checkNotNull(connector); @@ -131,7 +130,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName secondaryIndexers = conf.getAdditionalIndexers(); flushEachUpdate = conf.flushEachUpdate(); - + TableOperations tableOperations = connector.tableOperations(); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); @@ -150,7 +149,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs()); - + for (AccumuloIndexer index : secondaryIndexers) { index.setConnector(connector); index.setMultiTableBatchWriter(mt_bw); @@ -248,9 +247,15 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } catch (Exception e) { throw new RyaDAOException(e); } finally { - if (bd_spo != null) bd_spo.close(); - if (bd_po != null) bd_po.close(); - if (bd_osp != null) bd_osp.close(); + if (bd_spo != null) { + bd_spo.close(); + } + if (bd_po != null) { + bd_po.close(); + } + if (bd_osp != null) { + bd_osp.close(); + } } } @@ -520,10 +525,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); } - private void checkVersion() throws RyaDAOException { + private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException { String version = getVersion(); if (version == null) { - this.add(getVersionRyaStatement()); + //adding to core Rya tables but not Indexes + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement()); + Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + bw_spo.addMutations(spo); + bw_po.addMutations(po); + bw_osp.addMutations(osp); } //TODO: Do a version check here } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java index 28afce7..c15ba1b 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java @@ -25,6 +25,21 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.FilterFunctionOptimizer; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.entity.EntityOptimizer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.external.PrecomputedJoinIndexer; +import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer; +import mvm.rya.indexing.pcj.matching.PCJOptimizer; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -47,22 +62,9 @@ import org.apache.log4j.Logger; import org.openrdf.model.URI; import org.openrdf.model.impl.URIImpl; +import com.google.common.base.Optional; import com.google.common.collect.Lists; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.FilterFunctionOptimizer; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; -import mvm.rya.indexing.accumulo.entity.EntityOptimizer; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; -import mvm.rya.indexing.accumulo.freetext.Tokenizer; -import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; -import mvm.rya.indexing.mongodb.geo.MongoGeoIndexer; -import mvm.rya.indexing.pcj.matching.PCJOptimizer; - /** * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. */ @@ -96,6 +98,12 @@ public class ConfigUtils { public static final String USE_PCJ = "sc.use_pcj"; public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; + public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; + public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; + public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; + + public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail"; public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail"; @@ -366,6 +374,17 @@ public class ConfigUtils { return conf.getBoolean(USE_OPTIMAL_PCJ, false); } + + /** + * @return The name of the Fluo Application this instance of RYA is + * using to incrementally update PCJs. + */ + //TODO delete this eventually and use Details table + public Optional<String> getFluoAppName(Configuration conf) { + return Optional.fromNullable(conf.get(FLUO_APP_NAME)); + } + + public static boolean getUseMongo(final Configuration conf) { return conf.getBoolean(USE_MONGO, false); } @@ -391,6 +410,7 @@ public class ConfigUtils { if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { conf.setPcjOptimizer(PCJOptimizer.class); + indexList.add(PrecomputedJoinIndexer.class.getName()); } if (getUseGeo(conf)) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java index 6aaf2c4..0324a79 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java @@ -28,6 +28,15 @@ import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; +import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory; + import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.hadoop.conf.Configuration; @@ -41,21 +50,13 @@ import org.openrdf.model.URI; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; -import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier; - /** * Updates the state of the Precomputed Join indices that are used by Rya. */ @ParametersAreNonnullByDefault -public class PrecomputedJoinIndexer implements AccumuloIndexer { - private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class); +public class PrecomputedJoinIndexer extends AbstractAccumuloIndexer { + private static final Logger log = Logger + .getLogger(PrecomputedJoinIndexer.class); /** * This configuration object must be set before {@link #init()} is invoked. @@ -64,14 +65,14 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { private Optional<Configuration> conf = Optional.absent(); /** - * The Accumulo Connector that must be used when accessing an Accumulo storage. - * This value is provided by {@link #setConnector(Connector)}. + * The Accumulo Connector that must be used when accessing an Accumulo + * storage. This value is provided by {@link #setConnector(Connector)}. */ private Optional<Connector> accumuloConn = Optional.absent(); /** - * Provides access to the {@link Configuration} that was provided to this class - * using {@link #setConf(Configuration)}. + * Provides access to the {@link Configuration} that was provided to this + * class using {@link #setConf(Configuration)}. */ private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() { @Override @@ -92,23 +93,22 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { }; /** - * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used - * to interact with the PCJ results that are stored and used by Rya. + * Creates and grants access to the {@link PrecomputedJoinStorage} that will + * be used to interact with the PCJ results that are stored and used by Rya. */ - private final PrecomputedJoinStorageSupplier pcjStorageSupplier = - new PrecomputedJoinStorageSupplier( - configSupplier, - new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier)); + private final PrecomputedJoinStorageSupplier pcjStorageSupplier = new PrecomputedJoinStorageSupplier( + configSupplier, new AccumuloPcjStorageSupplier(configSupplier, + accumuloSupplier)); + + private PrecomputedJoinStorage pcjStorage; /** - * Creates and grants access to the {@link PrecomputedJoinUpdater} that will + * Creates and grants access to the {@link PrecomputedJoinUpdater}s that will * be used to update the state stored within the PCJ tables that are stored * in Accumulo. */ - private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier = - new PrecomputedJoinUpdaterSupplier( - configSupplier, - new FluoPcjUpdaterSupplier(configSupplier)); + private Supplier<PrecomputedJoinUpdater> updaterSupplier; + @Override public void setConf(final Configuration conf) { @@ -127,7 +127,7 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { @Override public void setConnector(final Connector connector) { checkNotNull(connector); - accumuloConn = Optional.of( connector ); + accumuloConn = Optional.of(connector); } /** @@ -135,40 +135,48 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { */ @Override public void init() { - pcjStorageSupplier.get(); - pcjUpdaterSupplier.get(); + pcjStorage = pcjStorageSupplier.get(); + updaterSupplier = new PcjUpdaterSupplierFactory(configSupplier).getSupplier(); + updaterSupplier.get(); } @Override public void storeStatement(final RyaStatement statement) throws IOException { checkNotNull(statement); - storeStatements( Collections.singleton(statement) ); + storeStatements(Collections.singleton(statement)); } @Override - public void storeStatements(final Collection<RyaStatement> statements) throws IOException { + public void storeStatements(final Collection<RyaStatement> statements) + throws IOException { checkNotNull(statements); try { - pcjUpdaterSupplier.get().addStatements(statements); + updaterSupplier.get().addStatements(statements); } catch (final PcjUpdateException e) { - throw new IOException("Could not update the PCJs by adding the provided statements.", e); + throw new IOException( + "Could not update the PCJs by adding the provided statements.", + e); } } @Override - public void deleteStatement(final RyaStatement statement) throws IOException { + public void deleteStatement(final RyaStatement statement) + throws IOException { checkNotNull(statement); try { - pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) ); + Collection<RyaStatement> statements = Collections.singleton(statement); + updaterSupplier.get().deleteStatements(statements); } catch (final PcjUpdateException e) { - throw new IOException("Could not update the PCJs by removing the provided statement.", e); + throw new IOException( + "Could not update the PCJs by removing the provided statement.", + e); } } @Override public void flush() throws IOException { try { - pcjUpdaterSupplier.get().flush(); + updaterSupplier.get().flush(); } catch (final PcjUpdateException e) { throw new IOException("Could not flush the PCJ Updater.", e); } @@ -177,13 +185,13 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { @Override public void close() { try { - pcjStorageSupplier.get().close(); + pcjStorage.close(); } catch (final PCJStorageException e) { log.error("Could not close the PCJ Storage instance.", e); } try { - pcjUpdaterSupplier.get().close(); + updaterSupplier.get().close(); } catch (final PcjUpdateException e) { log.error("Could not close the PCJ Updater instance.", e); } @@ -198,48 +206,56 @@ public class PrecomputedJoinIndexer implements AccumuloIndexer { } /** - * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}. + * Deletes all data from the PCJ indices that are managed by a + * {@link PrecomputedJoinStorage}. */ @Override public void purge(final RdfCloudTripleStoreConfiguration configuration) { - final PrecomputedJoinStorage storage = pcjStorageSupplier.get(); try { - for(final String pcjId : storage.listPcjs()) { + for (final String pcjId : pcjStorage.listPcjs()) { try { - storage.purge(pcjId); - } catch(final PCJStorageException e) { - log.error("Could not purge the PCJ index with id: " + pcjId, e); + pcjStorage.purge(pcjId); + } catch (final PCJStorageException e) { + log.error( + "Could not purge the PCJ index with id: " + pcjId, + e); } } } catch (final PCJStorageException e) { - log.error("Could not purge the PCJ indicies because they could not be listed.", e); + log.error( + "Could not purge the PCJ indicies because they could not be listed.", + e); } } /** - * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}. + * Deletes all of the PCJ indices that are managed by + * {@link PrecomputedJoinStorage}. */ @Override public void dropAndDestroy() { - final PrecomputedJoinStorage storage = pcjStorageSupplier.get(); - try { - for(final String pcjId : storage.listPcjs()) { + for (final String pcjId : pcjStorage.listPcjs()) { try { - storage.dropPcj(pcjId); - } catch(final PCJStorageException e) { - log.error("Could not delete the PCJ index with id: " + pcjId, e); + pcjStorage.dropPcj(pcjId); + } catch (final PCJStorageException e) { + log.error("Could not delete the PCJ index with id: " + + pcjId, e); } } - } catch(final PCJStorageException e) { - log.error("Could not delete the PCJ indicies because they could not be listed.", e); + } catch (final PCJStorageException e) { + log.error( + "Could not delete the PCJ indicies because they could not be listed.", + e); } } @Override - public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException { - // We do not need to use the writer that also writes to the core RYA tables. + public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) + throws IOException { + // We do not need to use the writer that also writes to the core RYA + // tables. } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java index c56f574..3c76601 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java @@ -22,14 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import javax.annotation.ParametersAreNonnullByDefault; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; + import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import com.google.common.base.Optional; -import mvm.rya.api.persist.index.RyaSecondaryIndexer; - /** * Inspects the {@link Configuration} object that is provided to all instances * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer} @@ -55,7 +56,7 @@ public class PrecomputedJoinIndexerConfig { /** * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered. */ - FLUO; + FLUO, NO_UPDATE; } // Indicates which implementation of PrecomputedJoinStorage to use. @@ -63,6 +64,7 @@ public class PrecomputedJoinIndexerConfig { // Indicates which implementation of PrecomputedJoinUpdater to use. public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; + public static final String USE_PCJ_FLUO_UPDATER = ConfigUtils.USE_PCJ_FLUO_UPDATER; // The configuration object that is provided to Secondary Indexing implementations. private final Configuration config; @@ -104,6 +106,13 @@ public class PrecomputedJoinIndexerConfig { return Optional.fromNullable(updaterType); } + + + public boolean getUseFluoUpdater() { + return config.getBoolean(USE_PCJ_FLUO_UPDATER, false); + } + + /** * @return The configuration object that has been wrapped. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java index 4378a4a..2a34a82 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java @@ -19,20 +19,19 @@ package mvm.rya.indexing.external.fluo; import static com.google.common.base.Preconditions.checkNotNull; +import mvm.rya.indexing.accumulo.ConfigUtils; import org.apache.hadoop.conf.Configuration; import com.google.common.base.Optional; -import mvm.rya.indexing.accumulo.ConfigUtils; - /** * Configuration values required to initialize a {@link FluoPcjUpdater}. */ public final class FluoPcjUpdaterConfig { // Defines which Fluo application is running for this instance of Rya. - public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + public static final String FLUO_APP_NAME = ConfigUtils.FLUO_APP_NAME; // Values that define which Accumulo instance hosts the Fluo application's table. public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java index ed202f4..61de078 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java @@ -26,25 +26,26 @@ import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERN import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS; import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME; import static mvm.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY; +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; import javax.annotation.ParametersAreNonnullByDefault; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; + import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import io.fluo.api.client.FluoClient; -import io.fluo.api.client.FluoFactory; -import io.fluo.api.config.FluoConfiguration; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; - /** * Creates instances of {@link FluoPcjUpdater} using the values found in a {@link Configuration}. */ @ParametersAreNonnullByDefault -public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> { +public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { private final Supplier<Configuration> configSupplier; @@ -66,7 +67,7 @@ public class FluoPcjUpdaterSupplier implements Supplier<FluoPcjUpdater> { final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType(); - checkArgument(updaterType.isPresent() && (updaterType.get() == PrecomputedJoinUpdaterType.FLUO), + checkArgument(updaterType.isPresent() && updaterType.get() == PrecomputedJoinUpdaterType.FLUO, "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE + "' value be set to '" + PrecomputedJoinUpdaterType.FLUO + "'."); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java new file mode 100644 index 0000000..66a6b24 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdater.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.fluo; + +import java.util.Collection; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.external.PrecomputedJoinIndexer; + +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +/** + * A NoOp updater (which does nothing) to be used by {@link PrecomputedJoinIndexer} if neither Batch nor + * {@link FluoPcjUpdater} is specified by the user to update Precomputed Joins. + * + */ +public class NoOpUpdater implements PrecomputedJoinUpdater { + + @Override + public void addStatements(Collection<RyaStatement> statements) + throws PcjUpdateException { + } + + @Override + public void deleteStatements(Collection<RyaStatement> statements) + throws PcjUpdateException { + } + + @Override + public void flush() throws PcjUpdateException { + } + + @Override + public void close() throws PcjUpdateException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java new file mode 100644 index 0000000..6831353 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/NoOpUpdaterSupplier.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.fluo; + +import mvm.rya.indexing.external.PrecomputedJoinIndexer; + +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +import com.google.common.base.Supplier; + +/** + * A {@link Supplier} for {@link NoOpUdater}s. This Supplier is used by + * {@link PrecomputedJoinIndexer} when no update strategy is specified by the user. + * + */ +public class NoOpUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { + + @Override + public NoOpUpdater get() { + return new NoOpUpdater(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java new file mode 100644 index 0000000..0250b6d --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/PcjUpdaterSupplierFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.fluo; + +import mvm.rya.indexing.external.PrecomputedJoinIndexer; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +/** + * A factory for {@link Supplier}s used by {@link PrecomputedJoinIndexer} to + * get all update strategies for precomputed joins for a given Rya instance. + * + */ +public class PcjUpdaterSupplierFactory { + + private Supplier<Configuration> configSupplier; + + public PcjUpdaterSupplierFactory(Supplier<Configuration> configSupplier) { + this.configSupplier = configSupplier; + } + + public Supplier<PrecomputedJoinUpdater> getSupplier() { + + PrecomputedJoinIndexerConfig config = new PrecomputedJoinIndexerConfig(configSupplier.get()); + //TODO this should not be read from the config. Instead, + //this information should be retrieved from the RyaDetails table + if(config.getUseFluoUpdater()) { + return Suppliers.memoize(new FluoPcjUpdaterSupplier(configSupplier)); + } + else { + return Suppliers.memoize(new NoOpUpdaterSupplier()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index 6ef282f..6ca8cd7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -1,25 +1,15 @@ <?xml version="1.0" encoding="utf-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project - xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> @@ -27,13 +17,13 @@ under the License. <artifactId>rya.pcj.fluo.parent</artifactId> <version>3.2.10-SNAPSHOT</version> </parent> - + <modelVersion>4.0.0</modelVersion> <artifactId>rya.pcj.fluo.integration</artifactId> - + <name>Apache Rya PCJ Fluo Integration Tests</name> <description>Integration tests for the Rya Fluo application.</description> - + <dependencies> <!-- Rya Runtime Dependencies. --> <dependency> @@ -48,7 +38,10 @@ under the License. <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.client</artifactId> </dependency> - + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> <!-- Testing dependencies. --> <dependency> <groupId>io.fluo</groupId> @@ -60,5 +53,9 @@ under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java index 618cab9..154156f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java @@ -20,8 +20,23 @@ package org.apache.rya.indexing.pcj.fluo; import static com.google.common.base.Preconditions.checkNotNull; +import io.fluo.api.client.FluoAdmin; +import io.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import io.fluo.api.client.FluoAdmin.TableExistsException; +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.client.Snapshot; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.mini.MiniFluo; + import java.io.File; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -29,6 +44,20 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.UUID; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; +import mvm.rya.rdftriplestore.RyaSailRepository; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -37,6 +66,7 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; @@ -57,112 +87,91 @@ import org.openrdf.query.BindingSet; import org.openrdf.query.impl.MapBindingSet; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; import com.google.common.io.Files; -import io.fluo.api.client.FluoClient; -import io.fluo.api.client.FluoFactory; -import io.fluo.api.client.Snapshot; -import io.fluo.api.config.FluoConfiguration; -import io.fluo.api.config.ObserverConfiguration; -import io.fluo.api.config.ScannerConfiguration; -import io.fluo.api.data.Bytes; -import io.fluo.api.iterator.ColumnIterator; -import io.fluo.api.iterator.RowIterator; -import io.fluo.api.mini.MiniFluo; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.rdftriplestore.RdfCloudTripleStore; -import mvm.rya.rdftriplestore.RyaSailRepository; - /** - * Integration tests that ensure the Fluo application processes PCJs results correctly. + * Integration tests that ensure the Fluo application processes PCJs results + * correctly. * <p> * This class is being ignored because it doesn't contain any unit tests. */ public abstract class ITBase { private static final Logger log = Logger.getLogger(ITBase.class); - public static final String USE_MOCK_INSTANCE = ".useMockInstance"; - public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; - public static final String CLOUDBASE_USER = "sc.cloudbase.username"; - public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; - protected static final String RYA_TABLE_PREFIX = "demo_"; + + protected static final String ACCUMULO_USER = "root"; + protected static final String ACCUMULO_PASSWORD = "password"; // Rya data store and connections. - protected MiniAccumuloCluster accumulo = null; - protected static Connector accumuloConn = null; protected RyaSailRepository ryaRepo = null; protected RepositoryConnection ryaConn = null; + + // Mini Accumulo Cluster + protected MiniAccumuloCluster cluster; + protected static Connector accumuloConn = null; + protected String instanceName = null; + protected String zookeepers = null; + // Fluo data store and connections. protected MiniFluo fluo = null; protected FluoClient fluoClient = null; + protected final String appName = "IntegrationTests"; @Before - public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException { - // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it. - accumulo = startMiniAccumulo(); - - // Setup the Rya library to use the Mini Accumulo. - ryaRepo = setupRya(accumulo); - ryaConn = ryaRepo.getConnection(); - + public void setupMiniResources() + throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException, + RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, TableExistsException { + // Initialize the Mini Accumulo that will be used to host Rya and Fluo. + setupMiniAccumulo(); + // Initialize the Mini Fluo that will be used to store created queries. fluo = startMiniFluo(); - fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() ); - } + fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); + // Initialize the Rya that will be used by the tests. + ryaRepo = setupRya(ACCUMULO_USER, ACCUMULO_PASSWORD, instanceName, zookeepers, appName); + ryaConn = ryaRepo.getConnection(); + } + @After public void shutdownMiniResources() { - if(ryaConn != null) { + // TODO shutdown the cluster + + if (ryaConn != null) { try { log.info("Shutting down Rya Connection."); ryaConn.close(); log.info("Rya Connection shut down."); - } catch(final Exception e) { + } catch (final Exception e) { log.error("Could not shut down the Rya Connection.", e); } } - if(ryaRepo != null) { + if (ryaRepo != null) { try { log.info("Shutting down Rya Repo."); ryaRepo.shutDown(); log.info("Rya Repo shut down."); - } catch(final Exception e) { + } catch (final Exception e) { log.error("Could not shut down the Rya Repo.", e); } } - - if(accumulo != null) { - try { - log.info("Shutting down the Mini Accumulo being used as a Rya store."); - accumulo.stop(); - log.info("Mini Accumulo being used as a Rya store shut down."); - } catch(final Exception e) { - log.error("Could not shut down the Mini Accumulo.", e); - } - } - - if(fluoClient != null) { + + if (fluoClient != null) { try { log.info("Shutting down Fluo Client."); fluoClient.close(); log.info("Fluo Client shut down."); - } catch(final Exception e) { + } catch (final Exception e) { log.error("Could not shut down the Fluo Client.", e); } } - if(fluo != null) { + if (fluo != null) { try { log.info("Shutting down Mini Fluo."); fluo.close(); @@ -171,28 +180,44 @@ public abstract class ITBase { log.error("Could not shut down the Mini Fluo.", e); } } + + if(cluster != null) { + try { + log.info("Shutting down the Mini Accumulo being used as a Rya store."); + cluster.stop(); + log.info("Mini Accumulo being used as a Rya store shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } } /** - * A helper fuction for creating a {@link BindingSet} from an array of {@link Binding}s. + * A helper fuction for creating a {@link BindingSet} from an array of + * {@link Binding}s. * - * @param bindings - The bindings to include in the set. (not null) + * @param bindings + * - The bindings to include in the set. (not null) * @return A {@link BindingSet} holding the bindings. */ protected static BindingSet makeBindingSet(final Binding... bindings) { final MapBindingSet bindingSet = new MapBindingSet(); - for(final Binding binding : bindings) { + for (final Binding binding : bindings) { bindingSet.addBinding(binding); } return bindingSet; } /** - * A helper function for creating a {@link RyaStatement} that represents a Triple. + * A helper function for creating a {@link RyaStatement} that represents a + * Triple. * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) * @return A Triple as a {@link RyaStatement}. */ protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) { @@ -200,44 +225,48 @@ public abstract class ITBase { checkNotNull(predicate); checkNotNull(object); - final RyaStatementBuilder builder = RyaStatement.builder() - .setSubject( new RyaURI(subject) ) - .setPredicate( new RyaURI(predicate) ); + final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject)) + .setPredicate(new RyaURI(predicate)); - if(object.startsWith("http://")) { - builder.setObject(new RyaURI(object) ); + if (object.startsWith("http://")) { + builder.setObject(new RyaURI(object)); } else { - builder.setObject( new RyaType(object) ); + builder.setObject(new RyaType(object)); } return builder.build(); } /** - * A helper function for creating a {@link RyaStatement} that represents a Triple. + * A helper function for creating a {@link RyaStatement} that represents a + * Triple. * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) * @return A Triple as a {@link RyaStatement}. */ protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) { checkNotNull(subject); checkNotNull(predicate); - return RyaStatement.builder() - .setSubject(new RyaURI(subject)) - .setPredicate(new RyaURI(predicate)) - .setObject( new RyaType(XMLSchema.INT, "" + object) ) - .build(); + return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate)) + .setObject(new RyaType(XMLSchema.INT, "" + object)).build(); } /** - * A helper function for creating a Sesame {@link Statement} that represents a Triple.. + * A helper function for creating a Sesame {@link Statement} that represents + * a Triple.. * - * @param subject - The Subject of the Triple. (not null) - * @param predicate - The Predicate of the Triple. (not null) - * @param object - The Object of the Triple. (not null) + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) * @return A Triple as a {@link Statement}. */ protected static Statement makeStatement(final String subject, final String predicate, final String object) { @@ -253,14 +282,17 @@ public abstract class ITBase { * Fetches the binding sets that are the results of a specific SPARQL query * from the Fluo table. * - * @param fluoClient- A connection to the Fluo table where the results reside. (not null) - * @param sparql - This query's results will be fetched. (not null) + * @param fluoClient- + * A connection to the Fluo table where the results reside. (not + * null) + * @param sparql + * - This query's results will be fetched. (not null) * @return The binding sets for the query's results. */ protected static Set<BindingSet> getQueryBindingSetValues(final FluoClient fluoClient, final String sparql) { final Set<BindingSet> bindingSets = new HashSet<>(); - try(Snapshot snapshot = fluoClient.newSnapshot()) { + try (Snapshot snapshot = fluoClient.newSnapshot()) { final String queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID).toString(); // Fetch the query's variable order. @@ -269,12 +301,13 @@ public abstract class ITBase { // Fetch the Binding Sets for the query. final ScannerConfiguration scanConfig = new ScannerConfiguration(); - scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), FluoQueryColumns.QUERY_BINDING_SET.getQualifier()); + scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(), + FluoQueryColumns.QUERY_BINDING_SET.getQualifier()); BindingSetStringConverter converter = new BindingSetStringConverter(); final RowIterator rowIter = snapshot.get(scanConfig); - while(rowIter.hasNext()) { + while (rowIter.hasNext()) { final Entry<Bytes, ColumnIterator> row = rowIter.next(); final String bindingSetString = row.getValue().next().getValue().toString(); final BindingSet bindingSet = converter.convert(bindingSetString, varOrder); @@ -285,77 +318,94 @@ public abstract class ITBase { return bindingSets; } - /** - * Setup a Mini Accumulo cluster that uses a temporary directory to store its data. - * - * @return A Mini Accumulo cluster. - */ - private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - final File miniDataDir = Files.createTempDir(); - - // Setup and start the Mini Accumulo. - final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password"); - accumulo.start(); - - // Store a connector to the Mini Accumulo. - final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); - accumuloConn = instance.getConnector("root", new PasswordToken("password")); - - return accumulo; + private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD); + cluster = new MiniAccumuloCluster(cfg); + cluster.start(); + + // Store a connector to the Mini Accumulo. + instanceName = cluster.getInstanceName(); + zookeepers = cluster.getZooKeepers(); + + Instance instance = new ZooKeeperInstance(instanceName, zookeepers); + accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD)); } - /** - * Format a Mini Accumulo to be a Rya repository. - * - * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null) - * @return The Rya repository sitting on top of the Mini Accumulo. - */ - private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException { - checkNotNull(accumulo); - - // Setup the Rya Repository that will be used to create Repository Connections. - final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); - final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); - crdfdao.setConnector(accumuloConn); + /** + * Sets up a Rya instance + * + * @param user + * @param password + * @param instanceName + * @param zookeepers + * @param appName + * @return + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws RepositoryException + * @throws RyaDAOException + * @throws NumberFormatException + * @throws UnknownHostException + * @throws InferenceEngineException + */ + protected static RyaSailRepository setupRya(String user, String password, String instanceName, String zookeepers, String appName) + throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, + NumberFormatException, UnknownHostException, InferenceEngineException { + + checkNotNull(user); + checkNotNull(password); + checkNotNull(instanceName); + checkNotNull(zookeepers); + checkNotNull(appName); // Setup Rya configuration values. final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("demo_"); + conf.setTablePrefix(RYA_TABLE_PREFIX); conf.setDisplayQueryPlan(true); - - conf.setBoolean(USE_MOCK_INSTANCE, true); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); - conf.set(CLOUDBASE_USER, "root"); - conf.set(CLOUDBASE_PASSWORD, "password"); - conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName()); - - crdfdao.setConf(conf); - ryaStore.setRyaDAO(crdfdao); - - final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); + conf.set(ConfigUtils.CLOUDBASE_USER, user); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, password); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, appName); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + + Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); ryaRepo.initialize(); return ryaRepo; } /** - * Override this method to provide an output configuration to the Fluo application. + * Override this method to provide an output configuration to the Fluo + * application. * <p> * Returns an empty map by default. * - * @return The parameters that will be passed to {@link QueryResultObserver} at startup. + * @return The parameters that will be passed to {@link QueryResultObserver} + * at startup. */ protected Map<String, String> makeExportParams() { return new HashMap<>(); } /** - * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll + * Setup a Mini Fluo cluster that uses a temporary directory to store its + * data.ll * * @return A Mini Fluo cluster. */ - protected MiniFluo startMiniFluo() { - final File miniDataDir = Files.createTempDir(); + protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { +// final File miniDataDir = Files.createTempDir(); // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverConfiguration> observers = new ArrayList<>(); @@ -364,18 +414,29 @@ public abstract class ITBase { observers.add(new ObserverConfiguration(JoinObserver.class.getName())); observers.add(new ObserverConfiguration(FilterObserver.class.getName())); - // Provide export parameters child test classes may provide to the export observer. - final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName()); - exportObserverConfig.setParameters( makeExportParams() ); + // Provide export parameters child test classes may provide to the + // export observer. + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration( + QueryResultObserver.class.getName()); + exportObserverConfig.setParameters(makeExportParams()); observers.add(exportObserverConfig); // Configure how the mini fluo will run. final FluoConfiguration config = new FluoConfiguration(); - config.setApplicationName("IntegrationTests"); - config.setMiniDataDir(miniDataDir.getAbsolutePath()); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(ACCUMULO_USER); + config.setAccumuloPassword(ACCUMULO_PASSWORD); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(appName); + config.setAccumuloTable("fluo" + appName); + config.addObservers(observers); - final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); - return miniFluo; + FluoFactory.newAdmin(config).initialize( + new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); + return FluoFactory.newMiniFluo(config); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 566d2d2..124f5a9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -31,6 +31,9 @@ import org.junit.Test; import com.google.common.base.Optional; import com.google.common.io.Files; +import io.fluo.api.client.FluoAdmin; +import io.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import io.fluo.api.client.FluoAdmin.TableExistsException; import io.fluo.api.client.FluoFactory; import io.fluo.api.config.FluoConfiguration; import io.fluo.api.config.ObserverConfiguration; @@ -48,24 +51,34 @@ public class CountStatementsIT extends ITBase { * statements are inserted as part of the test will not be consumed. * * @return A Mini Fluo cluster. + * @throws TableExistsException */ @Override - protected MiniFluo startMiniFluo() { - final File miniDataDir = Files.createTempDir(); - + protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverConfiguration> observers = new ArrayList<>(); // Configure how the mini fluo will run. final FluoConfiguration config = new FluoConfiguration(); - config.setApplicationName("IntegrationTests"); - config.setMiniDataDir(miniDataDir.getAbsolutePath()); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(ACCUMULO_USER); + config.setAccumuloPassword(ACCUMULO_PASSWORD); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(appName); + config.setAccumuloTable("fluo" + appName); + config.addObservers(observers); + FluoFactory.newAdmin(config).initialize( + new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); return miniFluo; } + @Test public void test() { // Insert some Triples into the Fluo app. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 1ebc29b..873b78e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -19,6 +19,8 @@ package org.apache.rya.indexing.pcj.fluo.integration; import static org.junit.Assert.assertEquals; +import io.fluo.api.client.Snapshot; +import io.fluo.api.data.Bytes; import java.util.HashMap; import java.util.HashSet; @@ -26,6 +28,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import mvm.rya.api.domain.RyaStatement; + import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -54,10 +58,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import io.fluo.api.client.Snapshot; -import io.fluo.api.data.Bytes; -import mvm.rya.api.domain.RyaStatement; - /** * Performs integration tests over the Fluo application geared towards Rya PCJ exporting. * <p> @@ -77,10 +77,10 @@ public class RyaExportIT extends ITBase { final RyaExportParameters ryaParams = new RyaExportParameters(params); ryaParams.setExportToRya(true); - ryaParams.setAccumuloInstanceName(accumulo.getInstanceName()); - ryaParams.setZookeeperServers(accumulo.getZooKeepers()); - ryaParams.setExporterUsername("root"); - ryaParams.setExporterPassword("password"); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); + ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); return params; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java new file mode 100644 index 0000000..68fd842 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; +import io.fluo.api.client.FluoClient; + +import java.util.HashSet; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.indexing.external.PrecomputedJoinIndexer; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; +import org.openrdf.repository.RepositoryConnection; + +import com.google.common.collect.Sets; + + +/** + * This test ensures that the correct updates are pushed by Fluo + * to the external PCJ table as triples are added to Rya through + * the {@link RepositoryConnection}. The key difference between these + * tests and those in {@link InputIT} is that streaming triples are added through + * the RepositoryConnection and not through the {@link FluoClient}. These tests are + * designed to verify that the {@link AccumuloRyaDAO} has been integrated + * with the {@link PrecomputedJoinIndexer} and that the associated {@link PrecomputedJoinUpdater} updates + * Fluo accordingly. + * + */ + +public class RyaInputIncrementalUpdateIT extends ITBase { + + /** + * Ensure historic matches are included in the result. + */ + @Test + public void streamResultsThroughRya() throws Exception { + + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Bob", "http://talksTo", "http://Eve"), + makeStatement("http://Charlie", "http://talksTo", "http://Eve"), + + makeStatement("http://Eve", "http://helps", "http://Kevin"), + + makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeStatement("http://David", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been + // computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie")))); + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, + new HashSet<VariableOrder>(), sparql); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + + // Load the historic data into Rya. + for (final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + fluo.waitForObservers(); + + final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + + /** + * Simulates the case where a Triple is added to Rya, a new query that + * includes that triple as a historic match is inserted into Fluo, and then + * some new triple that matches the query is streamed into Fluo. The query's + * results must include both the historic result and the newly streamed + * result. + */ + @Test + public void historicThenStreamedResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Alice", "http://worksAt", "http://Chipotle"), + makeStatement("http://Joe", "http://worksAt", "http://Chipotle")); + + // Triples that will be streamed into Fluo after the PCJ has been + final Set<Statement> streamedTriples = Sets.newHashSet( + makeStatement("http://Frank", "http://talksTo", "http://Eve"), + makeStatement("http://Joe", "http://talksTo", "http://Eve"), + makeStatement("http://Frank", "http://worksAt", "http://Chipotle")); + + // Load the historic data into Rya. + for (final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, + new HashSet<VariableOrder>(), sparql); + fluo.waitForObservers(); + + // Load the streaming data into Rya. + for (final Statement triple : streamedTriples) { + ryaConn.add(triple); + } + + // Ensure Alice is a match. + fluo.waitForObservers(); + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")))); + + Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } + + @Test + public void historicAndStreamMultiVariables() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = "SELECT ?x ?y " + "WHERE { " + "?x <http://talksTo> ?y. " + + "?x <http://worksAt> <http://Chipotle>." + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Alice", "http://worksAt", "http://Chipotle"), + makeStatement("http://Joe", "http://worksAt", "http://Chipotle")); + + // Triples that will be streamed into Fluo after the PCJ has been + final Set<Statement> streamedTriples = Sets.newHashSet( + makeStatement("http://Frank", "http://talksTo", "http://Betty"), + makeStatement("http://Joe", "http://talksTo", "http://Alice"), + makeStatement("http://Frank", "http://worksAt", "http://Chipotle")); + + // Load the historic data into Rya. + for (final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, + new HashSet<VariableOrder>(), sparql); + fluo.waitForObservers(); + + // Load the streaming data into Rya. + for (final Statement triple : streamedTriples) { + ryaConn.add(triple); + } + + // Ensure Alice is a match. + fluo.waitForObservers(); + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")), new BindingImpl("y", new URIImpl("http://Eve")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")), new BindingImpl("y", new URIImpl("http://Betty")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")), new BindingImpl("y", new URIImpl("http://Alice")))); + + Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d9aaca79/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index 2e00e6b..95228b8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.visibility; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import io.fluo.api.client.Snapshot; +import io.fluo.api.data.Bytes; import java.util.HashMap; import java.util.HashSet; @@ -27,6 +29,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTypeResolverException; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -68,11 +73,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import io.fluo.api.client.Snapshot; -import io.fluo.api.data.Bytes; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTypeResolverException; - public class PcjVisibilityIT extends ITBase { private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); @@ -86,10 +86,10 @@ public class PcjVisibilityIT extends ITBase { final RyaExportParameters ryaParams = new RyaExportParameters(params); ryaParams.setExportToRya(true); - ryaParams.setAccumuloInstanceName(accumulo.getInstanceName()); - ryaParams.setZookeeperServers(accumulo.getZooKeepers()); - ryaParams.setExporterUsername("root"); - ryaParams.setExporterPassword("password"); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); + ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); return params; }
