RYA-64 - Integrated Rya PCJ Secondary Index support into core Rya.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/14073a23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/14073a23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/14073a23 Branch: refs/heads/develop Commit: 14073a23fab3c8c9dca37747d732b83793aee3a3 Parents: 9efae3e Author: Kevin Chilton <[email protected]> Authored: Thu Apr 14 10:34:40 2016 -0400 Committer: Kevin Chilton <[email protected]> Committed: Fri May 13 11:26:19 2016 -0400 ---------------------------------------------------------------------- extras/indexing/pom.xml | 16 +- .../accumulo/precompQuery/AccumuloPcjQuery.java | 67 +- .../mvm/rya/indexing/accumulo/ConfigUtils.java | 226 +++-- .../indexing/accumulo/VisibilityBindingSet.java | 90 -- .../indexing/external/BindingSetDecorator.java | 105 --- .../indexing/external/PrecompJoinOptimizer.java | 30 +- .../external/PrecomputedJoinIndexer.java | 255 ++++++ .../external/PrecomputedJoinIndexerConfig.java | 113 +++ .../PrecomputedJoinStorageSupplier.java | 78 ++ .../PrecomputedJoinUpdaterSupplier.java | 78 ++ .../external/accumulo/AccumuloPcjStorage.java | 96 +++ .../accumulo/AccumuloPcjStorageConfig.java | 57 ++ .../accumulo/AccumuloPcjStorageSupplier.java | 77 ++ .../indexing/external/fluo/FluoPcjUpdater.java | 89 ++ .../external/fluo/FluoPcjUpdaterConfig.java | 120 +++ .../external/fluo/FluoPcjUpdaterSupplier.java | 93 +++ .../external/tupleSet/AccumuloIndexSet.java | 43 +- .../tupleSet/AccumuloPcjSerializer.java | 187 ----- .../external/tupleSet/BindingSetConverter.java | 108 --- .../tupleSet/BindingSetStringConverter.java | 149 ---- .../indexing/external/tupleSet/PcjTables.java | 833 ------------------- .../VisibilityBindingSetStringConverter.java | 62 -- .../AccumuloConstantPcjIntegrationTest.java | 10 +- .../external/AccumuloPcjIntegrationTest.java | 16 +- .../indexing/external/PCJOptionalTestIT.java | 18 +- .../external/PcjIntegrationTestingUtil.java | 37 +- .../PrecompJoinOptimizerIntegrationTest.java | 10 +- .../PrecomputedJoinStorageSupplierTest.java | 79 ++ .../PrecomputedJoinUpdaterSupplierTest.java | 79 ++ .../external/tupleSet/AccumuloIndexSetTest.java | 40 +- .../tupleSet/AccumuloPcjSerialzerTest.java | 173 ---- .../tupleSet/BindingSetStringConverterTest.java | 310 ------- .../tupleSet/PcjTablesIntegrationTests.java | 438 ---------- .../external/tupleSet/PcjTablesTests.java | 84 -- ...VisibilityBindingSetStringConverterTest.java | 132 --- .../src/main/java/RyaDirectExample.java | 6 +- extras/pom.xml | 1 + extras/rya.indexing.pcj/.gitignore | 1 + extras/rya.indexing.pcj/pom.xml | 84 ++ .../rya/indexing/pcj/storage/PcjException.java | 47 ++ .../rya/indexing/pcj/storage/PcjMetadata.java | 113 +++ .../pcj/storage/PrecomputedJoinStorage.java | 128 +++ .../storage/accumulo/AccumuloPcjSerializer.java | 185 ++++ .../storage/accumulo/BindingSetConverter.java | 106 +++ .../storage/accumulo/BindingSetDecorator.java | 105 +++ .../accumulo/BindingSetStringConverter.java | 148 ++++ .../storage/accumulo/PcjTableNameFactory.java | 73 ++ .../pcj/storage/accumulo/PcjTables.java | 653 +++++++++++++++ .../storage/accumulo/PcjVarOrderFactory.java | 37 + .../storage/accumulo/ShiftVarOrderFactory.java | 55 ++ .../pcj/storage/accumulo/VariableOrder.java | 117 +++ .../storage/accumulo/VisibilityBindingSet.java | 88 ++ .../VisibilityBindingSetStringConverter.java | 59 ++ .../pcj/update/PrecomputedJoinUpdater.java | 118 +++ .../accumulo/AccumuloPcjSerialzerTest.java | 175 ++++ .../accumulo/BindingSetStringConverterTest.java | 311 +++++++ .../accumulo/PcjTablesIntegrationTests.java | 546 ++++++++++++ .../pcj/storage/accumulo/PcjTablesTests.java | 84 ++ ...VisibilityBindingSetStringConverterTest.java | 133 +++ extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 4 + .../rya/indexing/pcj/fluo/api/CreatePcj.java | 12 +- .../indexing/pcj/fluo/api/GetPcjMetadata.java | 6 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 8 +- .../pcj/fluo/app/FilterResultUpdater.java | 8 +- .../pcj/fluo/app/JoinResultUpdater.java | 10 +- .../pcj/fluo/app/QueryResultUpdater.java | 8 +- .../app/export/IncrementalResultExporter.java | 3 +- .../fluo/app/export/rya/RyaResultExporter.java | 6 +- .../export/rya/RyaResultExporterFactory.java | 2 +- .../fluo/app/observers/BindingSetUpdater.java | 4 +- .../pcj/fluo/app/observers/FilterObserver.java | 6 +- .../pcj/fluo/app/observers/JoinObserver.java | 6 +- .../fluo/app/observers/QueryResultObserver.java | 6 +- .../app/observers/StatementPatternObserver.java | 6 +- .../pcj/fluo/app/observers/TripleObserver.java | 6 +- .../pcj/fluo/app/query/CommonNodeMetadata.java | 3 +- .../pcj/fluo/app/query/FilterMetadata.java | 3 +- .../fluo/app/query/FluoQueryMetadataDAO.java | 2 +- .../pcj/fluo/app/query/JoinMetadata.java | 3 +- .../pcj/fluo/app/query/QueryMetadata.java | 3 +- .../fluo/app/query/SparqlFluoQueryBuilder.java | 3 +- .../app/query/StatementPatternMetadata.java | 3 +- .../pcj/fluo/app/LeftOuterJoinTest.java | 3 +- .../indexing/pcj/fluo/app/NaturalJoinTest.java | 3 +- .../fluo/client/command/ListQueriesCommand.java | 2 +- .../fluo/client/command/NewQueryCommand.java | 2 +- .../fluo/client/util/ParsedQueryRequest.java | 3 +- .../fluo/client/util/PcjMetadataRenderer.java | 5 +- .../pcj/fluo/client/ParsedQueryRequestTest.java | 3 +- .../fluo/client/PcjMetadataRendererTest.java | 5 +- .../rya/indexing/pcj/fluo/demo/DemoDriver.java | 14 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 12 +- .../apache/rya/indexing/pcj/fluo/ITBase.java | 18 +- .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 7 +- .../indexing/pcj/fluo/api/GetQueryReportIT.java | 4 +- .../indexing/pcj/fluo/api/ListQueryIdsIT.java | 2 +- .../fluo/app/query/FluoQueryMetadataDAOIT.java | 2 +- .../indexing/pcj/fluo/integration/InputIT.java | 2 +- .../indexing/pcj/fluo/integration/QueryIT.java | 2 +- .../pcj/fluo/integration/RyaExportIT.java | 12 +- .../pcj/fluo/visibility/PcjVisibilityIT.java | 20 +- extras/rya.pcj.fluo/pom.xml | 32 - pom.xml | 38 + 103 files changed, 4996 insertions(+), 3087 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index 53e516d..d819199 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -76,16 +76,24 @@ under the License. <artifactId>geomesa-accumulo-datastore</artifactId> </dependency> + <!-- PCJ Indexing --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing.pcj</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.api</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-minicluster</artifactId> - <version>${accumulo.version}</version> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java index dd1e9c9..f6b7819 100644 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java @@ -1,5 +1,32 @@ package mvm.rya.accumulo.precompQuery; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -24,38 +51,10 @@ package mvm.rya.accumulo.precompQuery; */ import info.aduna.iteration.CloseableIteration; import info.aduna.iteration.Iteration; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; - import mvm.rya.api.resolver.RyaTypeResolverException; import mvm.rya.indexing.PcjQuery; import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; -import mvm.rya.indexing.external.tupleSet.BindingSetConverter.BindingSetConversionException; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; -import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** * This class encapsulates how pre-computed join tables are used during query @@ -71,10 +70,10 @@ import com.google.common.collect.Sets; */ public class AccumuloPcjQuery implements PcjQuery { private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - + private final Connector accCon; private final String tableName; - + public AccumuloPcjQuery(Connector accCon, String tableName) { this.accCon = accCon; this.tableName = tableName; @@ -324,7 +323,7 @@ public class AccumuloPcjQuery implements PcjQuery { bSet.addBinding(var, bs.getBinding(var).getValue()); } } - + return converter.convert(bSet, new VariableOrder(prefixVars)); } @@ -333,17 +332,17 @@ public class AccumuloPcjQuery implements PcjQuery { * @param key - Accumulo key obtained from scan * @param tableVarMap - map that associated query variables and table variables * @return - BindingSet without values associated with constant constraints - * @throws BindingSetConversionException + * @throws BindingSetConversionException */ private static BindingSet getBindingSetWithoutConstants(Key key, Map<String, String> tableVarMap) throws BindingSetConversionException { final byte[] row = key.getRow().getBytes(); final String[] varOrder = key.getColumnFamily().toString() .split(ExternalTupleSet.VAR_ORDER_DELIM); - + BindingSet bindingSet = converter.convert(row, new VariableOrder(varOrder)); final QueryBindingSet temp = new QueryBindingSet(bindingSet); - + final QueryBindingSet bs = new QueryBindingSet(); for (final String var : temp.getBindingNames()) { if (!tableVarMap.get(var).startsWith(ExternalTupleSet.CONST_PREFIX)) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java index cf98078..6c87182 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,20 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.FilterFunctionOptimizer; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; -import mvm.rya.indexing.accumulo.entity.EntityOptimizer; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; -import mvm.rya.indexing.accumulo.freetext.Tokenizer; -import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import mvm.rya.indexing.external.PrecompJoinOptimizer; -import mvm.rya.indexing.mongodb.MongoFreeTextIndexer; -import mvm.rya.indexing.mongodb.MongoGeoIndexer; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -63,6 +49,20 @@ import org.openrdf.model.impl.URIImpl; import com.google.common.collect.Lists; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.FilterFunctionOptimizer; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.entity.EntityOptimizer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.external.PrecompJoinOptimizer; +import mvm.rya.indexing.mongodb.MongoFreeTextIndexer; +import mvm.rya.indexing.mongodb.MongoGeoIndexer; + /** * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. */ @@ -88,17 +88,17 @@ public class ConfigUtils { public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions"; public static final String TEMPORAL_TABLENAME = "sc.temporal.index"; public static final String ENTITY_TABLENAME = "sc.entity.index"; - + public static final String USE_GEO = "sc.use_geo"; public static final String USE_FREETEXT = "sc.use_freetext"; public static final String USE_TEMPORAL = "sc.use_temporal"; public static final String USE_ENTITY = "sc.use_entity"; public static final String USE_PCJ = "sc.use_pcj"; public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; - + public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail"; public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail"; - + public static final String USE_MOCK_INSTANCE = ".useMockInstance"; public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; @@ -118,22 +118,22 @@ public class ConfigUtils { public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; - + public static final String USE_MONGO = "sc.useMongo"; - public static boolean isDisplayQueryPlan(Configuration conf){ + public static boolean isDisplayQueryPlan(final Configuration conf){ return conf.getBoolean(DISPLAY_QUERY_PLAN, false); } - + /** * get a value from the configuration file and throw an exception if the value does not exist. - * + * * @param conf * @param key * @return */ - private static String getStringCheckSet(Configuration conf, String key) { - String value = conf.get(key); + private static String getStringCheckSet(final Configuration conf, final String key) { + final String value = conf.get(key); Validate.notNull(value, key + " not set"); return value; } @@ -146,9 +146,9 @@ public class ConfigUtils { * @throws AccumuloSecurityException * @throws TableExistsException */ - public static boolean createTableIfNotExists(Configuration conf, String tablename) throws AccumuloException, AccumuloSecurityException, + public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException, TableExistsException { - TableOperations tops = getConnector(conf).tableOperations(); + final TableOperations tops = getConnector(conf).tableOperations(); if (!tops.exists(tablename)) { logger.info("Creating table: " + tablename); tops.create(tablename); @@ -156,102 +156,102 @@ public class ConfigUtils { } return false; } - - private static String getIndexTableName(Configuration conf, String indexTableNameConf, String altSuffix){ + + private static String getIndexTableName(final Configuration conf, final String indexTableNameConf, final String altSuffix){ String value = conf.get(indexTableNameConf); if (value == null){ - String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + final String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); Validate.notNull(defaultTableName, indexTableNameConf + " not set and " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name."); value = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX) + altSuffix; } return value; } - public static String getFreeTextDocTablename(Configuration conf) { + public static String getFreeTextDocTablename(final Configuration conf) { return getIndexTableName(conf, FREE_TEXT_DOC_TABLENAME, "freetext"); } - public static String getFreeTextTermTablename(Configuration conf) { + public static String getFreeTextTermTablename(final Configuration conf) { return getIndexTableName(conf, FREE_TEXT_TERM_TABLENAME, "freetext_term"); } - public static int getFreeTextTermLimit(Configuration conf) { + public static int getFreeTextTermLimit(final Configuration conf) { return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); } - public static String getGeoTablename(Configuration conf) { + public static String getGeoTablename(final Configuration conf) { return getIndexTableName(conf, GEO_TABLENAME, "geo"); } - - public static String getTemporalTableName(Configuration conf) { + + public static String getTemporalTableName(final Configuration conf) { return getIndexTableName(conf, TEMPORAL_TABLENAME, "temporal"); } - - - public static String getEntityTableName(Configuration conf) { + + + public static String getEntityTableName(final Configuration conf) { return getIndexTableName(conf, ENTITY_TABLENAME, "entity"); } - public static Set<URI> getFreeTextPredicates(Configuration conf) { + public static Set<URI> getFreeTextPredicates(final Configuration conf) { return getPredicates(conf, FREETEXT_PREDICATES_LIST); } - public static Set<URI> getGeoPredicates(Configuration conf) { + public static Set<URI> getGeoPredicates(final Configuration conf) { return getPredicates(conf, GEO_PREDICATES_LIST); } /** - * Used for indexing statements about date & time instances and intervals. + * Used for indexing statements about date & time instances and intervals. * @param conf * @return Set of predicate URI's whose objects should be date time literals. */ - public static Set<URI> getTemporalPredicates(Configuration conf) { + public static Set<URI> getTemporalPredicates(final Configuration conf) { return getPredicates(conf, TEMPORAL_PREDICATES_LIST); } - private static Set<URI> getPredicates(Configuration conf, String confName) { - String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); - Set<URI> predicates = new HashSet<URI>(); - for (String prediateString : validPredicateStrings) { + private static Set<URI> getPredicates(final Configuration conf, final String confName) { + final String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); + final Set<URI> predicates = new HashSet<URI>(); + for (final String prediateString : validPredicateStrings) { predicates.add(new URIImpl(prediateString)); } return predicates; } - public static Tokenizer getFreeTextTokenizer(Configuration conf) { - Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); + public static Tokenizer getFreeTextTokenizer(final Configuration conf) { + final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); return ReflectionUtils.newInstance(c, conf); } - public static BatchWriter createDefaultBatchWriter(String tablename, Configuration conf) throws TableNotFoundException, + public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - Connector connector = ConfigUtils.getConnector(conf); + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); } - public static MultiTableBatchWriter createMultitableBatchWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException { - Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - Connector connector = ConfigUtils.getConnector(conf); + public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException { + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); } - public static Scanner createScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException, + public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Connector connector = ConfigUtils.getConnector(conf); - Authorizations auths = ConfigUtils.getAuthorizations(conf); + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); return connector.createScanner(tablename, auths); } - public static BatchScanner createBatchScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException, + public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Connector connector = ConfigUtils.getConnector(conf); - Authorizations auths = ConfigUtils.getAuthorizations(conf); + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); Integer numThreads = null; if (conf instanceof RdfCloudTripleStoreConfiguration) numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); @@ -260,123 +260,123 @@ public class ConfigUtils { return connector.createBatchScanner(tablename, auths, numThreads); } - public static int getWriterMaxWriteThreads(Configuration conf) { + public static int getWriterMaxWriteThreads(final Configuration conf) { return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); } - public static long getWriterMaxLatency(Configuration conf) { + public static long getWriterMaxLatency(final Configuration conf) { return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); } - public static long getWriterMaxMemory(Configuration conf) { + public static long getWriterMaxMemory(final Configuration conf) { return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); } - public static String getUsername(JobContext job) { + public static String getUsername(final JobContext job) { return getUsername(job.getConfiguration()); } - public static String getUsername(Configuration conf) { + public static String getUsername(final Configuration conf) { return conf.get(CLOUDBASE_USER); } - public static Authorizations getAuthorizations(JobContext job) { + public static Authorizations getAuthorizations(final JobContext job) { return getAuthorizations(job.getConfiguration()); } - public static Authorizations getAuthorizations(Configuration conf) { - String authString = conf.get(CLOUDBASE_AUTHS, ""); + public static Authorizations getAuthorizations(final Configuration conf) { + final String authString = conf.get(CLOUDBASE_AUTHS, ""); if (authString.isEmpty()) { return new Authorizations(); } return new Authorizations(authString.split(",")); } - public static Instance getInstance(JobContext job) { + public static Instance getInstance(final JobContext job) { return getInstance(job.getConfiguration()); } - public static Instance getInstance(Configuration conf) { + public static Instance getInstance(final Configuration conf) { if (useMockInstance(conf)) { return new MockInstance(conf.get(CLOUDBASE_INSTANCE)); } return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS)); } - public static String getPassword(JobContext job) { + public static String getPassword(final JobContext job) { return getPassword(job.getConfiguration()); } - public static String getPassword(Configuration conf) { + public static String getPassword(final Configuration conf) { return conf.get(CLOUDBASE_PASSWORD, ""); } - public static Connector getConnector(JobContext job) throws AccumuloException, AccumuloSecurityException { + public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException { return getConnector(job.getConfiguration()); } - public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException { - Instance instance = ConfigUtils.getInstance(conf); + public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException { + final Instance instance = ConfigUtils.getInstance(conf); return instance.getConnector(getUsername(conf), getPassword(conf)); } - public static boolean useMockInstance(Configuration conf) { + public static boolean useMockInstance(final Configuration conf) { return conf.getBoolean(USE_MOCK_INSTANCE, false); } - private static int getNumPartitions(Configuration conf) { + private static int getNumPartitions(final Configuration conf) { return conf.getInt(NUM_PARTITIONS, 25); } - public static int getFreeTextDocNumPartitions(Configuration conf) { + public static int getFreeTextDocNumPartitions(final Configuration conf) { return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); } - public static int getFreeTextTermNumPartitions(Configuration conf) { + public static int getFreeTextTermNumPartitions(final Configuration conf) { return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); } - public static int getGeoNumPartitions(Configuration conf) { + public static int getGeoNumPartitions(final Configuration conf) { return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf)); } - - public static boolean getUseGeo(Configuration conf) { + + public static boolean getUseGeo(final Configuration conf) { return conf.getBoolean(USE_GEO, false); } - - public static boolean getUseFreeText(Configuration conf) { + + public static boolean getUseFreeText(final Configuration conf) { return conf.getBoolean(USE_FREETEXT, false); } - - public static boolean getUseTemporal(Configuration conf) { + + public static boolean getUseTemporal(final Configuration conf) { return conf.getBoolean(USE_TEMPORAL, false); } - - public static boolean getUseEntity(Configuration conf) { + + public static boolean getUseEntity(final Configuration conf) { return conf.getBoolean(USE_ENTITY, false); } - - public static boolean getUsePCJ(Configuration conf) { + + public static boolean getUsePCJ(final Configuration conf) { return conf.getBoolean(USE_PCJ, false); } - - public static boolean getUseOptimalPCJ(Configuration conf) { + + public static boolean getUseOptimalPCJ(final Configuration conf) { return conf.getBoolean(USE_OPTIMAL_PCJ, false); } - - public static boolean getUseMongo(Configuration conf) { + + public static boolean getUseMongo(final Configuration conf) { return conf.getBoolean(USE_MONGO, false); } - - - public static void setIndexers(RdfCloudTripleStoreConfiguration conf) { - - List<String> indexList = Lists.newArrayList(); - List<String> optimizers = Lists.newArrayList(); - - boolean useFilterIndex = false; - + + + public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + + final List<String> indexList = Lists.newArrayList(); + final List<String> optimizers = Lists.newArrayList(); + + boolean useFilterIndex = false; + if (ConfigUtils.getUseMongo(conf)) { if (getUseGeo(conf)) { indexList.add(MongoGeoIndexer.class.getName()); @@ -408,7 +408,7 @@ public class ConfigUtils { } } - + if (useFilterIndex) { optimizers.add(FilterFunctionOptimizer.class.getName()); } @@ -418,12 +418,8 @@ public class ConfigUtils { optimizers.add(EntityOptimizer.class.getName()); } - + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); conf.setStrings(AccumuloRdfConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); - } - - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java deleted file mode 100644 index b9e2351..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/VisibilityBindingSet.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.accumulo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.openrdf.query.BindingSet; - -import mvm.rya.indexing.external.BindingSetDecorator; - -/** - * Decorates a {@link BindingSet} with a collection of visibilities. - */ -@ParametersAreNonnullByDefault -public class VisibilityBindingSet extends BindingSetDecorator { - private static final long serialVersionUID = 1L; - private final String visibility; - private volatile int hashCode; - - /** - * @param set - Decorates the {@link BindingSet} with no visibilities. - */ - public VisibilityBindingSet(final BindingSet set) { - this(set, ""); - } - - /** - * Creates a new {@link VisibilityBindingSet} - * @param set - The {@link BindingSet} to decorate - * @param visibility - The visibilities on the {@link BindingSet} (not null) - */ - public VisibilityBindingSet(final BindingSet set, final String visibility) { - super(set); - this.visibility = checkNotNull(visibility); - } - - /** - * @return - The Visibilities on the {@link BindingSet} - */ - public String getVisibility() { - return visibility; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } else if(o instanceof VisibilityBindingSet) { - final VisibilityBindingSet other = (VisibilityBindingSet) o; - return set.equals(other) && visibility.equals(other.getVisibility()); - } - return false; - } - - @Override - public int hashCode() { - int result = hashCode; - if(result == 0) { - result = 31 * result + visibility.hashCode(); - result = 31 * result + super.hashCode(); - hashCode = result; - } - return result; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(super.toString()); - sb.append("\n Visibility: " + getVisibility() + "\n"); - return sb.toString(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java deleted file mode 100644 index b4909bd..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/BindingSetDecorator.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Iterator; -import java.util.Set; - -import org.openrdf.model.Value; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; - -/** - * Abstracts out the decoration of a {@link BindingSet}. - */ -public abstract class BindingSetDecorator implements BindingSet { - private static final long serialVersionUID = 1L; - protected final BindingSet set; - private volatile int hashCode; - - /** - * Constructs a new {@link BindingSetDecorator}, decorating the provided - * {@link BindingSet}. - * @param set - The {@link BindingSet} to be decorated. (not null) - */ - public BindingSetDecorator(final BindingSet set) { - this.set = checkNotNull(set); - } - - @Override - public Iterator<Binding> iterator() { - return set.iterator(); - } - - @Override - public Set<String> getBindingNames() { - return set.getBindingNames(); - } - - @Override - public Binding getBinding(final String bindingName) { - return set.getBinding(bindingName); - } - - @Override - public boolean hasBinding(final String bindingName) { - return set.hasBinding(bindingName); - } - - @Override - public Value getValue(final String bindingName) { - return set.getValue(bindingName); - } - - @Override - public int size() { - return set.size(); - } - - @Override - public boolean equals(final Object o) { - if(!(o instanceof BindingSetDecorator)) { - return false; - } - final BindingSetDecorator other = (BindingSetDecorator) o; - return set.equals(other.set); - } - - @Override - public int hashCode() { - int result = hashCode; - if(result == 0) { - result = 31 * result + set.hashCode(); - hashCode = result; - } - return result; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append(" names: "); - for (final String name : getBindingNames()) { - sb.append("\n [name]: " + name + " --- [value]: " + getBinding(name).getValue().toString()); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java index f8c6c77..75497da 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java @@ -27,27 +27,14 @@ import java.util.List; import java.util.Map; import java.util.Set; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; -import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; -import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector; -import mvm.rya.indexing.IndexPlanValidator.TupleReArranger; -import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; -import mvm.rya.indexing.external.tupleSet.PcjTables; -import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; -import mvm.rya.rdftriplestore.inference.DoNotExpandSP; -import mvm.rya.rdftriplestore.utils.FixedStatementPattern; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.MalformedQueryException; @@ -82,6 +69,19 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; +import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; +import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector; +import mvm.rya.indexing.IndexPlanValidator.TupleReArranger; +import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector; +import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.rdftriplestore.inference.DoNotExpandSP; +import mvm.rya.rdftriplestore.utils.FixedStatementPattern; + /** * {@link QueryOptimizer} which matches TupleExpressions associated with * pre-computed queries to sub-queries of a given query. Each matched sub-query http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java new file mode 100644 index 0000000..cce0a81 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; +import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier; + +/** + * Updates the state of the Precomputed Join indices that are used by Rya. + */ +@ParametersAreNonnullByDefault +public class PrecomputedJoinIndexer implements AccumuloIndexer { + private static final Logger log = Logger.getLogger(PrecomputedJoinIndexer.class); + + /** + * This configuration object must be set before {@link #init()} is invoked. + * It is set by {@link #setConf(Configuration)}. + */ + private Optional<Configuration> conf = Optional.absent(); + + /** + * The Accumulo Connector that must be used when accessing an Accumulo storage. + * This value is provided by {@link #setConnector(Connector)}. + */ + private Optional<Connector> accumuloConn = Optional.absent(); + + /** + * Provides access to the {@link Configuration} that was provided to this class + * using {@link #setConf(Configuration)}. + */ + private final Supplier<Configuration> configSupplier = new Supplier<Configuration>() { + @Override + public Configuration get() { + return getConf(); + } + }; + + /** + * Provides access to the Accumulo {@link Connector} that was provided to + * this class using {@link #setConnector(Connector)}. + */ + private final Supplier<Connector> accumuloSupplier = new Supplier<Connector>() { + @Override + public Connector get() { + return accumuloConn.get(); + } + }; + + /** + * Creates and grants access to the {@link PrecomputedJoinStorage} that will be used + * to interact with the PCJ results that are stored and used by Rya. + */ + private final PrecomputedJoinStorageSupplier pcjStorageSupplier = + new PrecomputedJoinStorageSupplier( + configSupplier, + new AccumuloPcjStorageSupplier(configSupplier, accumuloSupplier)); + + /** + * Creates and grants access to the {@link PrecomputedJoinUpdater} that will + * be used to update the state stored within the PCJ tables that are stored + * in Accumulo. + */ + private final PrecomputedJoinUpdaterSupplier pcjUpdaterSupplier = + new PrecomputedJoinUpdaterSupplier( + configSupplier, + new FluoPcjUpdaterSupplier(configSupplier)); + + @Override + public void setConf(final Configuration conf) { + this.conf = Optional.fromNullable(conf); + } + + @Override + public Configuration getConf() { + return this.conf.get(); + } + + /** + * Set the connector that will be used by {@link AccumuloPcjStorage} if the + * application is configured to store the PCJs within Accumulo. + */ + @Override + public void setConnector(final Connector connector) { + checkNotNull(connector); + accumuloConn = Optional.of( connector ); + } + + /** + * This is invoked when the host {@link RyaDAO#init()} method is invoked. + */ + @Override + public void init() { + pcjStorageSupplier.get(); + pcjUpdaterSupplier.get(); + } + + @Override + public void storeStatement(final RyaStatement statement) throws IOException { + checkNotNull(statement); + storeStatements( Collections.singleton(statement) ); + } + + @Override + public void storeStatements(final Collection<RyaStatement> statements) throws IOException { + checkNotNull(statements); + try { + pcjUpdaterSupplier.get().addStatements(statements); + } catch (final PcjUpdateException e) { + throw new IOException("Could not update the PCJs by adding the provided statements.", e); + } + } + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + checkNotNull(statement); + try { + pcjUpdaterSupplier.get().deleteStatements( Collections.singleton(statement) ); + } catch (final PcjUpdateException e) { + throw new IOException("Could not update the PCJs by removing the provided statement.", e); + } + } + + @Override + public void flush() throws IOException { + try { + pcjUpdaterSupplier.get().flush(); + } catch (final PcjUpdateException e) { + throw new IOException("Could not flush the PCJ Updater.", e); + } + } + + @Override + public void close() { + try { + pcjStorageSupplier.get().close(); + } catch (final PCJStorageException e) { + log.error("Could not close the PCJ Storage instance.", e); + } + + try { + pcjUpdaterSupplier.get().close(); + } catch (final PcjUpdateException e) { + log.error("Could not close the PCJ Updater instance.", e); + } + } + + /** + * This is invoked when the host {@link RyaDAO#destroy()} method is invoked. + */ + @Override + public void destroy() { + close(); + } + + /** + * Deletes all data from the PCJ indices that are managed by a {@link PrecomputedJoinStorage}. + */ + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + final PrecomputedJoinStorage storage = pcjStorageSupplier.get(); + + try { + for(final String pcjId : storage.listPcjs()) { + try { + storage.purge(pcjId); + } catch(final PCJStorageException e) { + log.error("Could not purge the PCJ index with id: " + pcjId, e); + } + } + } catch (final PCJStorageException e) { + log.error("Could not purge the PCJ indicies because they could not be listed.", e); + } + } + + /** + * Deletes all of the PCJ indices that are managed by {@link PrecomputedJoinStorage}. + */ + @Override + public void dropAndDestroy() { + final PrecomputedJoinStorage storage = pcjStorageSupplier.get(); + + try { + for(final String pcjId : storage.listPcjs()) { + try { + storage.dropPcj(pcjId); + } catch(final PCJStorageException e) { + log.error("Could not delete the PCJ index with id: " + pcjId, e); + } + } + } catch(final PCJStorageException e) { + log.error("Could not delete the PCJ indicies because they could not be listed.", e); + } + } + + @Override + public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException { + // We do not need to use the writer that also writes to the core RYA tables. + } + + @Override + public void dropGraph(final RyaURI... graphs) { + log.warn("PCJ indices do not store Graph metadata, so graph results can not be dropped."); + } + + @Override + public String getTableName() { + // This method makes assumptions about how PCJs are stored. It's only + // used by AccumuloRyaDAO to purge data, so it should be replaced with + // a purge() method. + log.warn("PCJ indicies are not stored within a single table, so this method can not be implemented."); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java new file mode 100644 index 0000000..c56f574 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexerConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +import com.google.common.base.Optional; + +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +/** + * Inspects the {@link Configuration} object that is provided to all instances + * of {@link RyaSecondaryIndexer} to provide {@link PrecomputedJoinIndexer} + * specific values. + */ +@ParametersAreNonnullByDefault +public class PrecomputedJoinIndexerConfig { + + /** + * Enumerates the different methodologies implemented to store the PCJ indices. + */ + public static enum PrecomputedJoinStorageType { + /** + * Stores each PCJ within an Accumulo table. + */ + ACCUMULO; + } + + /** + * Enumerates the different methodologies implemented to update the PCJ indices. + */ + public static enum PrecomputedJoinUpdaterType { + /** + * Incrementally updates the PCJs is pseudo-realtime new adds/deletes are encountered. + */ + FLUO; + } + + // Indicates which implementation of PrecomputedJoinStorage to use. + public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; + + // Indicates which implementation of PrecomputedJoinUpdater to use. + public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; + + // The configuration object that is provided to Secondary Indexing implementations. + private final Configuration config; + + /** + * Constructs an instance of {@link PrecomputedJoinIndexerConfig}. + * + * @param config - The {@link Configuration} object that is provided to + * all instance of {@link RyaSecondaryIndexer}. It will be inspected + * for {@link PrecomputedJoinIndexer} specific values. (not null) + */ + public PrecomputedJoinIndexerConfig(final Configuration config) { + this.config = checkNotNull(config); + } + + /** + * @return The type of {@link PrecomputedJoinStorage} to use. + */ + public Optional<PrecomputedJoinStorageType> getPcjStorageType() { + final String storageTypeString = config.get(PCJ_STORAGE_TYPE); + if(storageTypeString == null) { + return Optional.absent(); + } + + final PrecomputedJoinStorageType storageType = PrecomputedJoinStorageType.valueOf(storageTypeString); + return Optional.fromNullable(storageType); + } + + /** + * @return The type of {@link PrecomputedJoinUpdater} to use. + */ + public Optional<PrecomputedJoinUpdaterType> getPcjUpdaterType() { + final String updaterTypeString = config.get(PCJ_UPDATER_TYPE); + if(updaterTypeString == null) { + return Optional.absent(); + } + + final PrecomputedJoinUpdaterType updaterType = PrecomputedJoinUpdaterType.valueOf(updaterTypeString); + return Optional.fromNullable(updaterType); + } + + /** + * @return The configuration object that has been wrapped. + */ + public Configuration getConfig() { + return config; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java new file mode 100644 index 0000000..bf10c84 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; + +/** + * Creates an instance of {@link PrecomputedJoinStorage} based on the application's configuration. + */ +public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinStorage> { + + private final Supplier<Configuration> configSupplier; + private final AccumuloPcjStorageSupplier accumuloSupplier; + + /** + * Constructs an instance of {@link PrecomputedJoinStorageSupplier}. + * + * @param configSupplier - Provides access to the configuration of the + * application used to initialize the storage. (not null) + * @param accumuloSupplier - Used to create an Accumulo instance of the + * storage if that is the configured type. (not null) + */ + public PrecomputedJoinStorageSupplier( + final Supplier<Configuration> configSupplier, + final AccumuloPcjStorageSupplier accumuloSupplier) { + this.configSupplier = checkNotNull(configSupplier); + this.accumuloSupplier = checkNotNull(accumuloSupplier); + } + + @Override + public PrecomputedJoinStorage get() { + // Ensure a configuration has been set. + final Configuration config = configSupplier.get(); + checkNotNull(config, "Could not build the PrecomputedJoinStorage until the PrecomputedJoinIndexer has been configured."); + + final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); + + // Ensure the storage type has been set. + final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType(); + checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE + + "' property must have one of the following values: " + PrecomputedJoinStorageType.values()); + + // Create and return the configured storage. + switch(storageType.get()) { + case ACCUMULO: + return accumuloSupplier.get(); + + default: + throw new IllegalArgumentException("Unsupported PrecomputedJoinStorageType: " + storageType.get()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java new file mode 100644 index 0000000..cabadb4 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinUpdaterSupplier.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import mvm.rya.indexing.external.fluo.FluoPcjUpdaterSupplier; + +/** + * Creates instance of {@link PrecomputedJoinUpdater} based on the application's configuration. + */ +public class PrecomputedJoinUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> { + + private final Supplier<Configuration> configSupplier; + private final FluoPcjUpdaterSupplier fluoSupplier; + + /** + * Creates an instance of {@link PrecomputedJoinUpdaterSupplier}. + * + * @param configSupplier - Provides access to the configuration of the + * application used to initialize the updater. (not null) + * @param fluoSupplier - Used to create a Fluo instace of the updater + * if that is the configured type. (not null) + */ + public PrecomputedJoinUpdaterSupplier( + final Supplier<Configuration> configSupplier, + final FluoPcjUpdaterSupplier fluoSupplier) { + this.configSupplier = checkNotNull(configSupplier); + this.fluoSupplier = checkNotNull(fluoSupplier); + } + + @Override + public PrecomputedJoinUpdater get() { + // Ensure a configuration has been set. + final Configuration config = configSupplier.get(); + checkNotNull(config, "Can not build the PrecomputedJoinUpdater until the PrecomputedJoinIndexer has been configured."); + + final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); + + // Ensure an updater type has been set. + final Optional<PrecomputedJoinUpdaterType> updaterType = indexerConfig.getPcjUpdaterType(); + checkArgument(updaterType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE + + "' property must have one of the following values: " + PrecomputedJoinUpdaterType.values()); + + // Create and return the configured updater. + switch(updaterType.get()) { + case FLUO: + return fluoSupplier.get(); + + default: + throw new IllegalArgumentException("Unsupported PrecomputedJoinUpdaterType: " + updaterType.get()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java new file mode 100644 index 0000000..6e79748 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +/** + * An Accumulo backed implementation of {@link PrecomputedJoinStorage}. + */ +@ParametersAreNonnullByDefault +public class AccumuloPcjStorage implements PrecomputedJoinStorage { + + private final PcjTableNameFactory pcjIdFactory = new PcjTableNameFactory(); + private final PcjTables pcjTables = new PcjTables(); + + private final Connector accumuloConn; + private final String ryaInstanceName; + + /** + * Constructs an instance of {@link AccumuloPcjStorage}. + * + * @param accumuloConn - The connector that will be used to connect to Accumulo. (not null) + * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null) + */ + public AccumuloPcjStorage(final Connector accumuloConn, final String ryaInstanceName) { + this.accumuloConn = checkNotNull(accumuloConn); + this.ryaInstanceName = checkNotNull(ryaInstanceName); + } + + @Override + public List<String> listPcjs() throws PCJStorageException { + return pcjTables.listPcjTables(accumuloConn, ryaInstanceName); + } + + @Override + public String createPcj(final String sparql, final Set<VariableOrder> varOrders) throws PCJStorageException { + final String pcjId = pcjIdFactory.makeTableName(ryaInstanceName); + pcjTables.createPcjTable(accumuloConn, pcjId, varOrders, sparql); + return pcjId; + } + + @Override + public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { + return pcjTables.getPcjMetadata(accumuloConn, pcjId); + } + + @Override + public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException { + pcjTables.addResults(accumuloConn, pcjId, results); + } + + @Override + public void purge(final String pcjId) throws PCJStorageException { + pcjTables.purgePcjTable(accumuloConn, pcjId); + } + + @Override + public void dropPcj(final String pcjId) throws PCJStorageException { + pcjTables.dropPcjTable(accumuloConn, pcjId); + } + + @Override + public void close() throws PCJStorageException { + // Accumulo Connectors don't require closing. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java new file mode 100644 index 0000000..73b50db --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.hadoop.conf.Configuration; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; + +/** + * Configuration values required to initialize a {@link AccumuloPcjStorage}. + */ +public class AccumuloPcjStorageConfig { + + private final RdfCloudTripleStoreConfiguration config; + + /** + * Constructs an instance of {@link AccumuloPcjStorageConfig}. + * + * @param config - The configuration values that will be interpreted. (not null) + */ + public AccumuloPcjStorageConfig(final Configuration config) { + checkNotNull(config); + + // Wrapping the config with this class so that we can use it's getTablePrefix() method. + this.config = new RdfCloudTripleStoreConfiguration(config) { + @Override + public RdfCloudTripleStoreConfiguration clone() { + return null; + } + }; + } + + /** + * @return The Rya Instance name the storage grants access to. + */ + public String getRyaInstanceName() { + return config.getTablePrefix(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java new file mode 100644 index 0000000..77b8f2e --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.accumulo; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.accumulo.core.client.Connector; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; + +/** + * Creates instances of {@link AccumuloPcjStorage} using the values found in a {@link Configuration}. + */ +public class AccumuloPcjStorageSupplier implements Supplier<AccumuloPcjStorage> { + + private final Supplier<Configuration> configSupplier; + private final Supplier<Connector> accumuloSupplier; + + /** + * Constructs an instance of {@link AccumuloPcjStorageSupplier}. + * + * @param configSupplier - Configures the {@link AccumuloPcjStorage} that is + * supplied by this class. (not null) + * @param accumuloSupplier - Provides the {@link Connector} that is used by + * the {@link AccumuloPcjStorage} that is supplied by this class. (not null) + */ + public AccumuloPcjStorageSupplier( + final Supplier<Configuration> configSupplier, + final Supplier<Connector> accumuloSupplier) { + this.configSupplier = checkNotNull(configSupplier); + this.accumuloSupplier = checkNotNull(accumuloSupplier); + } + + @Override + public AccumuloPcjStorage get() { + // Ensure a configuration has been set. + final Configuration config = configSupplier.get(); + checkNotNull(config, "Could not create a AccumuloPcjStorage because the application's configuration has not been provided yet."); + + // Ensure the correct storage type has been set. + final PrecomputedJoinIndexerConfig indexerConfig = new PrecomputedJoinIndexerConfig(config); + + final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType(); + checkArgument(storageType.isPresent() && (storageType.get() == PrecomputedJoinStorageType.ACCUMULO), + "This supplier requires the '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE + + "' value be set to '" + PrecomputedJoinStorageType.ACCUMULO + "'."); + + // Ensure the Accumulo connector has been set. + final Connector accumuloConn = accumuloSupplier.get(); + checkNotNull(accumuloConn, "The Accumulo Connector must be set before initializing the AccumuloPcjStorage."); + + final String ryaInstanceName = new AccumuloPcjStorageConfig(config).getRyaInstanceName(); + return new AccumuloPcjStorage(accumuloConn, ryaInstanceName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java new file mode 100644 index 0000000..901bd61 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdater.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.fluo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; + +import com.google.common.base.Optional; + +import io.fluo.api.client.FluoClient; +import mvm.rya.api.domain.RyaStatement; + +/** + * Updates the PCJ indices by forwarding the statement additions/removals to + * a Fluo application. + */ +@ParametersAreNonnullByDefault +public class FluoPcjUpdater implements PrecomputedJoinUpdater { + private static final Logger log = Logger.getLogger(FluoPcjUpdater.class); + + // Used to only print the unsupported delete operation once. + private boolean deleteWarningPrinted = false; + + private final FluoClient fluoClient; + private final InsertTriples insertTriples = new InsertTriples(); + private final String statementVis; + + /** + * Constructs an instance of {@link FluoPcjUpdater}. + * + * @param fluoClient - A connection to the Fluo table new statements will be + * inserted into and deleted from. (not null) + * @param statementVis - The visibility label that will be applied to all + * statements that are inserted via the Fluo PCJ updater. (not null) + */ + public FluoPcjUpdater(final FluoClient fluoClient, final String statementVis) { + this.fluoClient = checkNotNull(fluoClient); + this.statementVis = checkNotNull(statementVis); + } + + @Override + public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException { + insertTriples.insert(fluoClient, statements, Optional.of(statementVis)); + } + + @Override + public void deleteStatements(final Collection<RyaStatement> statements) throws PcjUpdateException { + // The Fluo application does not support statement deletion. + if(!deleteWarningPrinted) { + log.warn("The Fluo PCJ updating application does not support Statement deletion, " + + "but you are trying to use that feature. This may result in your PCJ index " + + "no longer reflecting the Statemetns that are stored in the core Rya tables."); + deleteWarningPrinted = true; + } + } + + @Override + public void flush() { + // The Fluo application does not do any batching, so this doesn't do anything. + } + + @Override + public void close() { + fluoClient.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/14073a23/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java new file mode 100644 index 0000000..4378a4a --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/fluo/FluoPcjUpdaterConfig.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.indexing.external.fluo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Optional; + +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Configuration values required to initialize a {@link FluoPcjUpdater}. + */ +public final class FluoPcjUpdaterConfig { + + // Defines which Fluo application is running for this instance of Rya. + public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + + // Values that define which Accumulo instance hosts the Fluo application's table. + public static final String ACCUMULO_ZOOKEEPERS = ConfigUtils.CLOUDBASE_ZOOKEEPERS; + public static final String ACCUMULO_INSTANCE = ConfigUtils.CLOUDBASE_INSTANCE; + public static final String ACCUMULO_USERNAME = ConfigUtils.CLOUDBASE_USER; + public static final String ACCUMULO_PASSWORD = ConfigUtils.CLOUDBASE_PASSWORD; + + // Values that define the visibilities associated with statement that are inserted by the fluo updater. + public static final String STATEMENT_VISIBILITY = ConfigUtils.CLOUDBASE_AUTHS; + + // The configuration object that is provided to Secondary Indexing implementations. + private final Configuration config; + + /** + * Constructs an instance of {@link FluoPcjUpdaterConfig}. + * + * @param config - The configuration values that will be interpreted. (not null) + */ + public FluoPcjUpdaterConfig(final Configuration config) { + this.config = checkNotNull(config); + } + + /** + * @return The name of the Fluo Application this instance of RYA is + * using to incrementally update PCJs. + */ + public Optional<String> getFluoAppName() { + return Optional.fromNullable(config.get(FLUO_APP_NAME)); + } + + /** + * This value is the {@link #getAccumuloInstance()} value appended with the + * "/fluo" namespace. + * + * @return The zookeepers that are used to manage Fluo state. ({@code null} + * if not configured) + */ + public Optional<String> getFluoZookeepers() { + final Optional<String> accumuloZookeepers = getAccumuloZookeepers(); + if(!accumuloZookeepers.isPresent()) { + return Optional.absent(); + } + return Optional.of( accumuloZookeepers.get() + "/fluo" ); + } + + /** + * @return The zookeepers used to connect to the Accumulo instance that + * is storing the state of the Fluo Application. + */ + public Optional<String> getAccumuloZookeepers() { + return Optional.fromNullable(config.get(ACCUMULO_ZOOKEEPERS)); + } + + /** + * @return The instance name of the Accumulo instance that is storing + * the state of the Fluo Application. + */ + public Optional<String> getAccumuloInstance() { + return Optional.fromNullable(config.get(ACCUMULO_INSTANCE)); + } + + /** + * @return The username the indexer will authenticate when connecting + * to the Accumulo instance that stores the state of the Fluo Application. + */ + public Optional<String> getAccumuloUsername() { + return Optional.fromNullable(config.get(ACCUMULO_USERNAME)); + } + + /** + * @return The password the indexer will authenticate when connecting + * to the Accumulo instance that stores the state of the Fluo Application. + */ + public Optional<String> getAccumuloPassword() { + return Optional.fromNullable(config.get(ACCUMULO_PASSWORD)); + } + + /** + * @return The visibility labels that will be attached to the statements + * that are inserted into the Fluo Application. + */ + public Optional<String> getStatementVisibility() { + return Optional.fromNullable(config.get(STATEMENT_VISIBILITY)); + } +} \ No newline at end of file
